Acompanhamento assíncrono do progresso

O acompanhamento assíncrono do progresso reduz a latência nos pipelines de Streaming Estruturado ao permitir que as consultas atualizem assíncronamente o progresso dos checkpoints e processem dados em cada micro-lote.

Durante o processamento de consultas, o Structured Streaming persiste e gere deslocamentos para medir o progresso da consulta em e offsetLogcommitLog em cada micro-lote. Sem acompanhamento assíncrono do progresso, as operações de gestão de offset afetam diretamente a latência de processamento porque o processamento de dados não pode continuar até serem concluídos.

Acompanhamento de Progresso Assíncrono

Observação

O acompanhamento assíncrono do progresso não é compatível com disparadores Trigger.once ou Trigger.availableNow. Se ativadas, as consultas do Structured Streaming com Trigger.once ou Trigger.availableNow falham.

Opções de configuração

Opção Padrão Descrição
asyncProgressTrackingEnabled false Se deve ativar o acompanhamento assíncrono do progresso.
asyncProgressTrackingCheckpointIntervalMs 1000 O intervalo em milissegundos entre as escritas dos deslocamentos e os commits de conclusão.

Ativar o acompanhamento assíncrono do progresso

Para permitir o acompanhamento assíncrono do progresso, defina asyncProgressTrackingEnabled para true:

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Melhore a taxa de transferência com a frequência de checkpoints

A frequência padrão dos pontos de controlo, de 1000 milissegundos, tem um bom rendimento para a maioria das consultas. Quando as operações de gestão de offset ocorrem mais rapidamente do que o acompanhamento de progresso assíncrono pode processá-las, desenvolve-se um acúmulo de operações de gestão de offsets. Para evitar que o atraso cresça ainda mais, o acompanhamento assíncrono do progresso pode bloquear ou abrandar o processamento de dados, podendo corroer os benefícios esperados de latência.

Neste cenário, o Databricks recomenda que aumente o intervalo de pontos de controlo:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Observação

O tempo de recuperação de falhas aumenta com o intervalo do ponto de controlo. Em caso de falha, um pipeline deve reprocessar todos os dados desde o checkpoint bem-sucedido anterior. Antes de fazer esta alteração na produção, considere o balanço entre a redução da latência durante o processamento regular e o tempo de recuperação em caso de falha.

Desligue o acompanhamento assíncrono do progresso

Quando o acompanhamento assíncrono do progresso está ativado, o fluxo não garante o progresso dos pontos de controlo para cada lote. Deves controlar o progresso antes de poderes desligar esta funcionalidade.

Para desligar, siga estes passos:

  1. Processe pelo menos dois micro-lotes com asyncProgressTrackingEnabled definido para true e asyncProgressTrackingCheckpointIntervalMs definido para 0:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Pare com a consulta:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Desligue o acompanhamento assíncrono do progresso e reinicie a consulta:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Se desativar o acompanhamento assíncrono do progresso sem seguir os passos acima, poderá encontrar o seguinte erro:

java.lang.IllegalStateException: batch x doesn't exist

Nos logs de driver, você pode ver o seguinte erro:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Limitações

  • Para os sinks Kafka, o acompanhamento assíncrono de progresso suporta apenas pipelines sem estado.
  • O acompanhamento assíncrono do progresso não garante o processamento ponta a ponta exatamente uma vez, porque os intervalos de compensação de um lote podem alterar-se em caso de falha. Alguns lavatórios, como o Kafka, nunca fornecem garantias exatas.