Acompanhamento de progresso assíncrono

O acompanhamento de progresso assíncrono reduz a latência para pipelines de Streaming Estruturado, permitindo que as consultas atualizem de forma assíncrona o progresso do ponto de verificação e processem dados em cada microlote.

Durante o processamento de consulta, o Streaming Estruturado persiste e gerencia deslocamentos no offsetLog e no commitLog para medir o progresso da consulta em cada microlote. Sem o acompanhamento de progresso assíncrono, as operações de gerenciamento de offset afetam diretamente a latência de processamento porque o processamento de dados não pode continuar até que sejam concluídas.

Acompanhamento de Progresso Assíncrono

Nota

O acompanhamento de progresso assíncrono não é compatível com Trigger.once ou Trigger.availableNow gatilhos. Se habilitado, as consultas de Streaming Estruturado com Trigger.once ou Trigger.availableNow falham.

Opções de configuração

Opção Padrão Descrição
asyncProgressTrackingEnabled false Se deseja habilitar o acompanhamento de progresso assíncrono.
asyncProgressTrackingCheckpointIntervalMs 1000 O intervalo em milissegundos entre gravações para deslocamentos e confirmações de conclusão.

Habilitar o acompanhamento de progresso assíncrono

Para habilitar o acompanhamento de progresso assíncrono, defina asyncProgressTrackingEnabled como true:

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Melhore a taxa de transferência pela frequência de pontos de verificação

A frequência de ponto de verificação padrão de 1000 milissegundos tem uma boa taxa de transferência para a maioria das consultas. Quando as operações de gerenciamento de deslocamento ocorrem mais rápido do que o rastreamento de progresso assíncrono pode processá-las, um acúmulo de operações pendentes de gerenciamento de deslocamento é formado. Para evitar que o backlog cresça ainda mais, o acompanhamento de progresso assíncrono pode bloquear ou retardar o processamento de dados, potencialmente comprometendo os benefícios de baixa latência esperados.

Nesse cenário, o Databricks recomenda que você aumente o intervalo de ponto de verificação:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Nota

O tempo de recuperação de falha aumenta com o tempo de intervalo do ponto de verificação. Em caso de falha, o pipeline deve reprocessar todos os dados desde o ponto de verificação bem-sucedido anterior. Antes de fazer essa alteração na produção, considere a compensação entre a latência mais baixa durante o processamento regular em comparação com o tempo de recuperação em caso de falha.

Desativar o acompanhamento de progresso assíncrono

Quando o acompanhamento de progresso assíncrono está habilitado, o fluxo não garante o progresso do ponto de verificação para cada lote. Você deve registrar o progresso antes de desativar esse recurso.

Para desativar, siga estas etapas:

  1. Processe pelo menos dois microlotes com asyncProgressTrackingEnabled definido como true e asyncProgressTrackingCheckpointIntervalMs definido como 0:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Interrompa a consulta:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Desative o acompanhamento de progresso assíncrono e reinicie a consulta:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Se você desativar o acompanhamento de progresso assíncrono sem seguir as etapas acima, poderá encontrar o seguinte erro:

java.lang.IllegalStateException: batch x doesn't exist

Nos logs de driver, você pode ver o seguinte erro:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Limitações

  • Para coletores Kafka, o acompanhamento de progresso assíncrono só dá suporte a pipelines sem estado.
  • O acompanhamento do progresso assíncrono não garante o processamento de ponta a ponta exatamente-uma-vez, pois os intervalos de offset para um lote podem ser alterados em caso de falha. Alguns coletores, como Kafka, nunca fornecem garantias de exatamente uma vez.