Eseguire più query Structured Streaming sullo stesso cluster

Molti clienti eseguono più query di Structured Streaming nello stesso cluster Azure Databricks. Anche se questo modello è supportato, Databricks consiglia di limitare il numero di query per ogni cluster per evitare problemi di ridimensionamento e colli di bottiglia delle prestazioni. Nel calcolo serverless Azure Databricks gestisce automaticamente il ridimensionamento, quindi queste considerazioni vengono gestite automaticamente. Se si utilizza il compute classico, in cui si controlla il dimensionamento del driver e dell'executor, questa pagina descrive i principali colli di bottiglia da tenere presenti e come affrontarli.

Nota

Databricks consiglia di usare le pipeline dichiarative di Lakeflow Spark per i nuovi carichi di lavoro di streaming, che gestisce automaticamente la complessità dell'infrastruttura. Vedere Pipeline dichiarative di Lakeflow Spark.

Quando usare più query nello stesso cluster

L'esecuzione di più query di streaming nello stesso cluster riduce i costi dell'infrastruttura, soprattutto quando si hanno molti flussi di piccole dimensioni che non richiedono risorse di calcolo dedicate. Il compromesso chiave è un errore condiviso: se il cluster ha esito negativo, ogni flusso su di esso ha esito negativo. Per le pipeline mission-critical, questa modalità di guasto comune è spesso inaccettabile.

Per i carichi di lavoro che combinano flussi critici e non critici, Databricks consiglia quanto segue:

  • Assegnare a ogni flusso una priorità in base all'impatto aziendale.
  • Posizionare flussi cruciali in cluster dedicati, anche a costi più elevati.
  • Co-individuare i flussi con priorità inferiore per condividere il calcolo e ridurre i costi.

Ridimensionamento del driver

Il driver è una risorsa condivisa. Più query condividono la stessa CPU, la stessa memoria, lo scheduler DAG, lo scheduler delle attività ed esecuzione lato driver delle UDF (ad esempio, foreachBatch). Quando si eseguono numerosi flussi concorrenti, prestare attenzione a questi specifici colli di bottiglia, oltre alla normale allocazione di CPU e memoria:

  • Sovraccarico di Auto Loader: se i flussi usano Auto Loader, il rilevamento dei file e l'elenco delle directory aumentano il carico sul driver.
  • Limiti delle risorse a livello di sistema operativo (file aperti): l'esecuzione simultanea di un volume elevato di flussi basati su file (ad esempio FileStreamSource o auto loader) su un singolo driver può esaurire i limiti del descrittore di file aperti a livello di utente, che possono causare errori casuali del flusso.
  • Backpressure del bus di listener: un numero elevato di query di streaming simultanee può causare la backpressure sul bus della StreamingQueryListener singola sessione Spark. Tutti gli eventi (inclusi onQueryIdle) vengono inviati a questo singolo bus e un backlog di eventi di grandi dimensioni può ritardare gravemente i gestori asincroni onQueryProgress e influire sulla stabilità del cluster.
  • Operazioni costose del driver: evitare di chiamare collect() o altre costose operazioni di DataFrame sul driver, a meno che non sia assolutamente necessario, per evitare la materializzazione di set di risultati di grandi dimensioni e il verificarsi di errori di memoria esaurita (OOM).

Risolvere i problemi di contesa del driver

Se si verificano arresti anomali del driver a causa di problemi di OOM o di contesa delle risorse:

  1. Monitorare le metriche dei driver nell'interfaccia utente di Spark. Se viene visualizzato un utilizzo elevato di CPU, memoria o disco, regolare il ridimensionamento del driver nelle impostazioni di calcolo del cluster.
  2. Se i problemi persistono, controlla che il codice non esegua operazioni che richiedono molta memoria o funzioni definite dall'utente sul driver.
  3. Se non è possibile ridimensionare ulteriormente il driver in verticale, Databricks consiglia vivamente di suddividere i job tra più cluster per aggirare questi colli di bottiglia di scalabilità dei nodi condivisi.

Dimensionamento dell'executor

Con più query in esecuzione sullo stesso cluster, tutte le query condividono gli slot di esecuzione sugli executor. Le fasi di una query possono occupare gli slot disponibili, causando ritardi e attesa indefinita per altre query. Spark usa un mapping 1:1 tra gli slot di attività e i core disponibili. Assicurarsi che siano disponibili core sufficienti se le query devono essere eseguite contemporaneamente.

In generale, gli executor potrebbero eseguire operazioni a elevato utilizzo di memoria rispetto al nodo driver. Ottimizza, se necessario, i parametri della JVM dell'executor e di allocazione della memoria off-heap per gestire il carico dell'applicazione. Assicurarsi che i nodi executor vengano ridimensionati in modo appropriato in termini di CPU, memoria e spazio su disco e ridimensionare verticalmente, se necessario. Se il ridimensionamento verticale non è possibile, è consigliabile aggiungere nodi di lavoro aggiuntivi al cluster.

Nota

Alcune di queste modifiche potrebbero richiedere il riavvio del cluster per avere effetto.

Usa i pool dello scheduler

È possibile configurare pool di schedulazione per assegnare capacità di calcolo alle query durante l'esecuzione di più query di streaming dallo stesso codice sorgente.

Per impostazione predefinita, tutte le query avviate in un notebook vengono eseguite nello stesso pool di pianificazione condiviso. I job di Apache Spark generati dai trigger di tutte le query di streaming presenti in un notebook vengono eseguiti uno dopo l'altro in ordine FIFO (first in, first out). Ciò può causare ritardi non necessari nelle query, perché non condividono in modo efficiente le risorse del cluster.

I pool del pianificatore consentono di dichiarare quali query di Structured Streaming condividono le risorse di calcolo.

Nell'esempio seguente viene assegnato a query1 un pool dedicato, mentre query2 e query3 condividono un pool di pianificazione.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Nota

La configurazione della proprietà locale deve trovarsi nella stessa cella del notebook in cui si avvia la query di streaming.

Per ulteriori informazioni sui pool del fair scheduler, consultare la documentazione del fair scheduler di Apache Spark.

Considerazioni sulle query stateful

Per le query con stato in esecuzione nello stesso cluster, tenere presente quanto segue:

  • Utilizzare RocksDB come provider dell'archivio di stato per evitare problemi di OOM e pause del GC. RocksDB è il provider dell'archivio stati predefinito in Databricks Runtime 17.3 e versioni successive. Vedere Configurare l'archivio di stato RocksDB su Azure Databricks.
  • Ottimizza le partizioni di shuffle in base ai requisiti dell'applicazione. Per le fasi stateful, Spark pianifica i task in proporzione al numero di partizioni di shuffle.
  • Limita l'utilizzo della memoria di RocksDB per ciascun nodo per evitare errori OOM causati dall'utilizzo di memoria off-heap. Questa operazione viene gestita automaticamente in Databricks Runtime 17.3 e versioni successive, ma richiede la configurazione manuale nelle versioni precedenti. Vedere Utilizzo della memoria cap RocksDB.
  • Evitare di accumulare troppe partizioni sullo stesso nodo esecutore. Le operazioni di manutenzione sull'archivio di stato, compresi il caricamento e la pulizia degli snapshot, vengono eseguite a livello di singolo nodo. L'assegnazione di troppe partizioni a un nodo esecutore può causare una carenza di risorse per la manutenzione e tempi di ripristino più lunghi, a causa della disponibilità di un numero inferiore di snapshot completi.