Programação assíncrona no SDK do Azure para Java

Este artigo descreve o modelo de programação assíncrona no SDK do Azure para Java.

Inicialmente, o SDK do Azure continha apenas APIs assíncronas sem bloqueio para interagir com os serviços do Azure. Essas APIs permitem que você use o SDK do Azure para criar aplicativos escaláveis que usam recursos do sistema de forma eficiente. No entanto, o SDK do Azure para Java também contém clientes síncronos para servir um público mais vasto e tornar as bibliotecas de clientes acessíveis para utilizadores não familiarizados com programação assíncrona. (Consulte Approachable nas diretrizes de design do SDK do Azure.) Como tal, todas as bibliotecas cliente Java no SDK para Java do Azure oferecem clientes assíncronos e síncronos. No entanto, utilize os clientes assíncronos para sistemas de produção para maximizar a utilização dos recursos do sistema.

Fluxos reativos no SDK do Azure para Java

Se olhar para a secção Async Service Clients nas Diretrizes de Design Java SDK do Azure, verá que, em vez de usar CompletableFuture fornecido pela Java 8, as APIs assíncronas usam tipos reativos. Porque é que a equipa do SDK do Azure escolheu tipos reativos em vez de tipos que estão disponíveis nativamente no JDK?

O Java 8 introduziu recursos como Streams, Lambdas e CompletableFuture. Estas funcionalidades oferecem muitas capacidades, mas têm algumas limitações.

CompletableFuture oferece capacidades baseadas em chamadas de retorno, sem bloqueio, e a interface CompletionStage permite compor facilmente uma série de operações assíncronas. Os Lambdas tornam essas APIs baseadas em push mais legíveis. Os fluxos fornecem operações de estilo funcional para lidar com uma coleção de elementos de dados. No entanto, os fluxos são síncronos e não podem ser reutilizados. CompletableFuture permite que você faça uma única solicitação, fornece suporte para um retorno de chamada e espera uma única resposta. No entanto, muitos serviços de nuvem exigem a capacidade de transmitir dados - Hubs de Eventos, por exemplo.

Os fluxos reativos podem ajudar a ultrapassar estas limitações transmitindo elementos de uma fonte para um assinante. Quando um assinante solicita dados de uma fonte, a fonte envia qualquer número de resultados de volta. Não precisa enviá-los todos de uma vez. A transferência acontece ao longo de um período de tempo, sempre que a fonte tem dados para enviar.

Nesse modelo, o assinante registra manipuladores de eventos para processar dados quando eles chegam. Essas interações baseadas em push notificam o assinante por meio de sinais distintos:

  • Uma onSubscribe() chamada indica que a transferência de dados está prestes a começar.
  • Uma onError() chamada indica que houve um erro, que também marca o fim da transferência de dados.
  • Uma onComplete() chamada indica a conclusão bem-sucedida da transferência de dados.

Ao contrário do Java Streams, os fluxos reativos tratam os erros como eventos de primeira classe. Os fluxos reativos têm um canal dedicado para que a fonte comunique quaisquer erros ao assinante. Além disso, os fluxos reativos permitem que o assinante negocie a taxa de transferência de dados para transformar esses fluxos em um modelo push-pull.

A especificação Reative Streams fornece um padrão para como a transferência de dados deve ocorrer. Em um alto nível, a especificação define as quatro interfaces a seguir e especifica regras sobre como essas interfaces devem ser implementadas.

  • O Publisher é a fonte de um fluxo de dados.
  • O assinante é o consumidor de um fluxo de dados.
  • A subscrição gere o estado da transferência de dados entre um editor e um subscritor.
  • O processador é um editor e um assinante.

Algumas bibliotecas de Java bem conhecidas fornecem implementações desta especificação, como RxJava, Akka Streams, Vert.x e Project Reactor.

O SDK do Azure para Java adotou o Project Reator para oferecer suas APIs assíncronas. O principal fator que impulsionou essa decisão foi fornecer uma integração suave com o Spring Webflux, que também usa o Project Reator. Outro fator que contribuiu para escolher o Project Reator em vez do RxJava foi que o Project Reator usa Java 8, mas o RxJava, na época, ainda estava no Java 7. O Project Reator também oferece um rico conjunto de operadores que podem ser compostos e permitem que você escreva código declarativo para a construção de pipelines de processamento de dados. Outra coisa legal sobre o Project Reator é que ele tem adaptadores para converter tipos de Project Reator para outros tipos de implementação populares.

Compare APIs de operações síncronas e assíncronas

Aprendeste sobre clientes síncronos e opções para clientes assíncronos. A tabela seguinte resume como são as APIs desenhadas com estas opções:

Tipo de API Sem valor Valor único Vários valores
Java padrão - APIs síncronas void T Iterable<T>
Java padrão - APIs assíncronas CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfaces de Fluxos Reativos Publisher<Void> Publisher<T> Publisher<T>
Projeto de Implementação de Reatores de Correntes Reativas Mono<Void> Mono<T> Flux<T>

Por uma questão de completude, vale a pena mencionar que o Java 9 introduziu a classe Flow que inclui as quatro interfaces de fluxos reativos. No entanto, essa classe não inclui nenhuma implementação.

Usar APIs assíncronas no SDK do Azure para Java

A especificação de fluxos reativos não diferencia entre tipos de editores. Na especificação de fluxos reativos, os publicadores simplesmente produzem zero ou mais elementos de dados. Em muitos casos, há uma distinção útil entre um editor que produz no máximo um elemento de dados e um que produz zero ou mais. Em APIs baseadas em nuvem, essa distinção indica se uma solicitação retorna uma resposta de valor único ou uma coleção. O Project Reator fornece dois tipos para fazer esta distinção - Mono e Flux. Uma API que retorna um Mono conterá uma resposta que tem no máximo um valor, e uma API que retorna um Flux conterá uma resposta que tem zero ou mais valores.

Por exemplo, suponha que você use um ConfigurationAsyncClient para recuperar uma configuração armazenada usando o serviço de Configuração de Aplicativo do Azure. (Para obter mais informações, consulte O que é a Configuração do Aplicativo do Azure?.)

Se você criar um ConfigurationAsyncClient e chamar getConfigurationSetting() no cliente, ele retornará um Mono, o que indica que a resposta contém um único valor. No entanto, chamar este método por si só não faz nada. O cliente ainda não fez um pedido ao serviço Azure App Configuration. Neste estágio, o Mono<ConfigurationSetting> retornado por esta API é apenas uma "configuração" do pipeline de processamento de dados. Esta arquitetura significa que a configuração necessária para consumir os dados está completa. Para realmente acionar a transferência de dados (ou seja, para fazer a solicitação para o serviço e obter a resposta), você deve se inscrever para o devolvido Mono. Então, ao lidar com esses fluxos reativos, deve-se lembrar de invocar subscribe() porque nada acontece até que isso seja feito.

O exemplo a seguir mostra como subscrever ao Mono e imprimir no console o valor de configuração.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Observe que, depois de chamar getConfigurationSetting() no cliente, o código de exemplo assina o resultado e fornece três lambdas separados. O primeiro lambda consome dados recebidos do serviço, que são ativados após uma resposta bem-sucedida. O segundo lambda é ativado se houver um erro ao recuperar a configuração. O terceiro lambda é invocado quando o fluxo de dados é concluído, o que significa que não são esperados mais elementos de dados desse fluxo.

Observação

Tal como acontece com toda a programação assíncrona, depois de a subscrição ser criada, a execução prossegue como habitualmente. Se não houver nada que mantenha o programa ativo e a correr, pode terminar antes de a operação assíncrona terminar. O tópico principal que chamou subscribe() não espera até fazeres a chamada de rede para Azure App Configuration e receberes uma resposta. Em sistemas de produção, você pode continuar a processar outra coisa, mas neste exemplo você pode adicionar um pequeno atraso chamando Thread.sleep() ou usando um CountDownLatch para dar à operação assíncrona a chance de ser concluída.

Como mostrado no exemplo a seguir, as APIs que retornam um Flux também seguem um padrão semelhante. A diferença é que o primeiro callback passado ao método subscribe() é chamado várias vezes para cada elemento de dados na resposta. O erro ou os callbacks de conclusão são chamados exatamente uma vez e são considerados como sinais finais. Nenhum outro callback é invocado se um desses sinais for recebido do publicador.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Contrapressão

O que acontece quando a fonte está produzindo os dados a um ritmo mais rápido do que o assinante pode lidar? O assinante pode ficar sobrecarregado com dados, o que pode levar a erros de falta de memória. O assinante precisa de uma maneira de se comunicar com o editor para diminuir a velocidade quando ele não consegue acompanhar. Por predefinição, quando invoca subscribe() num(a) Flux, como se mostra no exemplo anterior, o subscritor está a solicitar um fluxo ilimitado de dados, indicando ao publicador que envie os dados o mais rapidamente possível. Este comportamento nem sempre é desejável, e o assinante pode ter de controlar a taxa de publicação através de "contrapressão". Backpressure permite que o assinante assuma o controle do fluxo de elementos de dados. Um assinante solicita um número limitado de elementos de dados que pode gerir. Depois de o assinante terminar de processar estes elementos, pode solicitar mais. Usando backpressure, é possível transformar um modelo de push para transferência de dados em um modelo de push-pull.

O exemplo a seguir mostra como você pode controlar a taxa na qual os eventos são recebidos pelo consumidor de Hubs de Eventos:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Quando o assinante "se conecta" pela primeira vez ao editor, o editor entrega ao assinante uma Subscription instância, que gerencia o estado da transferência de dados. Este Subscription é o meio através do qual o assinante pode aplicar backpressure ligando request() para especificar quantos mais elementos de dados ele pode manipular.

Se o assinante solicitar mais do que um elemento de dados cada vez que ligar onNext(), request(10) por exemplo, o editor envia imediatamente os próximos 10 elementos se estiverem disponíveis ou quando estiverem disponíveis. Estes elementos acumulam-se num buffer na extremidade do subscritor e, como cada chamada onNext() solicita mais 10 elementos, a fila de pendentes continua a crescer até que o publicador deixe de ter mais elementos de dados para enviar ou até que o buffer do subscritor atinja a capacidade máxima, resultando em erros de memória insuficiente.

Cancelar uma subscrição de fluxo reativo

Uma subscrição gere o estado da transferência de dados entre um editor e um subscritor. A subscrição mantém-se ativa até que o editor termine de transferir todos os dados para o assinante ou que este deixe de estar interessado em receber dados. Pode cancelar uma subscrição de algumas formas, como mostrado nos exemplos seguintes.

O exemplo a seguir cancela a assinatura descartando o assinante:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

O exemplo a seguir cancela a assinatura chamando o cancel() método em Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusão

Os threads são recursos caros que você não deve desperdiçar esperando respostas de chamadas de serviço remoto. À medida que a adoção de arquiteturas de microserviços aumenta, a necessidade de escalar e utilizar os recursos de forma eficiente torna-se vital. As APIs assíncronas são favoráveis quando há operações ligadas à rede. O SDK do Azure para Java oferece um conjunto avançado de APIs para operações assíncronas para ajudar a maximizar os recursos do sistema. Experimenta os clientes assíncronos.

Para obter mais informações sobre os operadores que melhor se adequam às suas tarefas específicas, consulte Qual operador eu preciso? , no Guia de Referência do Reator 3.

Próximos passos

Agora que você entende melhor os vários conceitos de programação assíncrona, é importante aprender a iterar os resultados. Para obter mais informações sobre as melhores estratégias de iteração e detalhes de como a paginação funciona, consulte Paginação e iteração no SDK do Azure para Java.