Asynchrone Programmierung im Azure SDK für Java

In diesem Artikel wird das asynchrone Programmiermodell im Azure SDK für Java beschrieben.

Das Azure SDK enthielt anfänglich nur nicht blockierende, asynchrone APIs für die Interaktion mit Azure-Diensten. Mit diesen APIs können Sie das Azure SDK verwenden, um skalierbare Anwendungen zu erstellen, die Systemressourcen effizient verwenden. Die Azure SDK für Java enthält jedoch auch synchrone Clients, um ein breiteres Publikum zu erreichen und Clientbibliotheken für Benutzer zugänglich zu machen, die mit der asynchronen Programmierung nicht vertraut sind. (Siehe "Ansatzfähig " in den Azure SDK-Entwurfsrichtlinien.) Daher bieten alle Java-Clientbibliotheken im Azure SDK für Java sowohl asynchrone als auch synchrone Clients an. Verwenden Sie jedoch die asynchronen Clients für Produktionssysteme, um die Nutzung von Systemressourcen zu maximieren.

Reaktive Datenströme im Azure SDK für Java

Wenn Sie sich den Abschnitt Async Service Clients in den Entwurfsrichtlinien Java Azure SDK ansehen, sehen Sie, dass anstelle von CompletableFuture, die von Java 8 bereitgestellt werden, die asynchronen APIs reaktive Typen verwenden. Warum hat das Azure SDK Team reaktive Typen über Typen ausgewählt, die nativ in JDK verfügbar sind?

Java 8 hat Features wie Streams, Lambdas und CompletableFuture eingeführt. Diese Features bieten viele Funktionen, weisen jedoch einige Einschränkungen auf.

CompletableFuture stellt rückrufbasierte, nicht blockierende Funktionen bereit, und die CompletionStage Schnittstelle erleichtert das Verfassen einer Reihe asynchroner Vorgänge. Lambdas machen diese pushbasierten APIs besser lesbar. Datenströme stellen Vorgänge im Funktionalen Stil bereit, um eine Sammlung von Datenelementen zu verarbeiten. Datenströme sind jedoch synchron und können nicht wiederverwendet werden. CompletableFuture ermöglicht es Ihnen, eine einzelne Anforderung zu stellen, Bietet Unterstützung für einen Rückruf und erwartet eine einzelne Antwort. Viele Clouddienste erfordern jedoch die Möglichkeit, Daten zu streamen – z. B. Event Hubs.

Reaktive Streams können helfen, diese Einschränkungen zu überwinden, indem Elemente von einer Quelle an einen Abonnenten gestreamt werden. Wenn ein Abonnent Daten aus einer Quelle anfordert, sendet die Quelle eine beliebige Anzahl von Ergebnissen zurück. Es muss sie nicht alle gleichzeitig senden. Die Übertragung erfolgt über einen bestimmten Zeitraum, wenn die Quelle Daten sendet.

In diesem Modell registriert der Abonnent Ereignishandler, um Daten bei deren Ankunft zu verarbeiten. Diese pushbasierten Interaktionen benachrichtigen den Abonnenten über unterschiedliche Signale:

  • Ein onSubscribe() Aufruf gibt an, dass die Datenübertragung beginnt.
  • Ein onError() Aufruf gibt an, dass ein Fehler aufgetreten ist, der auch das Ende der Datenübertragung kennzeichnet.
  • Ein onComplete() Aufruf zeigt den erfolgreichen Abschluss der Datenübertragung an.

Im Gegensatz zu Java Streams behandeln reaktive Datenströme Fehler als Erstklassenereignisse. Reaktive Datenströme verfügen über einen dedizierten Kanal für die Quelle, um Fehler an den Abonnenten zu kommunizieren. Außerdem ermöglichen reaktive Datenströme es dem Abonnenten, die Datenübertragungsrate auszuhandeln, um diese Datenströme in ein Push-Pull-Modell zu transformieren.

Die Spezifikation "Reaktive Datenströme " stellt einen Standard für die Übertragung von Daten bereit. Auf hoher Ebene definiert die Spezifikation die folgenden vier Schnittstellen und gibt Regeln für die Implementierung dieser Schnittstellen an.

  • Publisher ist die Quelle eines Datenstroms.
  • Abonnent ist der Verbraucher eines Datenstroms.
  • Das Abonnement verwaltet den Status der Datenübertragung zwischen einem Herausgeber und einem Abonnenten.
  • Processor ist sowohl ein Herausgeber als auch ein Abonnent.

Einige bekannte Java-Bibliotheken stellen Implementierungen dieser Spezifikation bereit, wie RxJava, Akka Streams, Vert.x und Project Reactor.

Das Azure SDK für Java hat Project Reactor übernommen, um seine asynchronen APIs anzubieten. Der wichtigste Faktor für diese Entscheidung war die reibungslose Integration mit Spring Webflux, die auch Project Reactor verwendet. Ein weiterer Faktor für die Auswahl von Project Reaktor über RxJava war, dass Project Reactor Java 8 verwendet, aber RxJava war damals noch bei Java 7. Project Reactor bietet auch eine vielzahl von Operatoren, die komponierbar sind und es Ihnen ermöglichen, deklarativen Code für die Erstellung von Datenverarbeitungspipelines zu schreiben. Eine weitere schöne Sache von Project Reactor ist, dass es Adapter für die Konvertierung von Project-Reaktortypen in andere beliebte Implementierungstypen hat.

Vergleichen von APIs synchroner und asynchroner Vorgänge

Sie haben über synchrone Clients und Optionen für asynchrone Clients gelernt. In der folgenden Tabelle wird zusammengefasst, wie APIs aussehen, die mithilfe dieser Optionen entworfen wurden:

API-Typ Kein Wert Einzelner Wert Mehrere Werte
Standard Java – synchrone APIs void T Iterable<T>
Standard Java – asynchrone APIs CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktive Datenströme-Schnittstellen Publisher<Void> Publisher<T> Publisher<T>
Project Reactor-Implementierung von reaktiven Streams Mono<Void> Mono<T> Flux<T>

Aus Gründen der Vollständigkeit ist es erwähnenswert, dass Java 9 die Flow-Klasse eingeführt hat, die die vier reaktiven Datenstromschnittstellen enthält. Diese Klasse enthält jedoch keine Implementierung.

Verwenden asynchroner APIs im Azure SDK für Java

Die Spezifikation für reaktive Datenströme unterscheidet nicht zwischen Herausgebertypen. In der Spezifikation für reaktive Datenströme erzeugen Herausgeber einfach null oder mehr Datenelemente. In vielen Fällen gibt es einen nützlichen Unterschied zwischen einem Herausgeber, der höchstens ein Datenelement erzeugt, im Vergleich zu einem Element, das null oder mehr erzeugt. In cloudbasierten APIs gibt diese Unterscheidung an, ob eine Anforderung eine einzelwertige Antwort oder eine Auflistung zurückgibt. Project Reactor bietet zwei Typen, um diese Unterscheidung zu treffen - Mono und Flux. Eine API, die eine Mono Antwort zurückgibt, enthält eine Antwort, die höchstens einen Wert aufweist, und eine API, die eine Flux Antwort zurückgibt, die null oder mehr Werte enthält.

Angenommen, Sie verwenden einen ConfigurationAsyncClient , um eine konfiguration abzurufen, die mit dem Azure App-Konfigurationsdienst gespeichert ist. (Weitere Informationen finden Sie unter Was ist Die Azure-App-Konfiguration?)

Wenn Sie auf dem Client eine ConfigurationAsyncClient erstellen und getConfigurationSetting() aufrufen, wird ein Mono zurückgegeben, der angibt, dass die Antwort einen einzelnen Wert enthält. Das Aufrufen dieser Methode allein macht jedoch nichts. Der Client hat noch keine Anforderung an den Azure App Configuration-Dienst gestellt. In dieser Phase ist die von dieser API zurückgegebene Mono<ConfigurationSetting> nur eine "Zusammenstellung" der Datenverarbeitungspipeline. Diese Architektur bedeutet, dass die erforderliche Einrichtung für die Nutzung der Daten abgeschlossen ist. Um die Datenübertragung tatsächlich auszulösen (d. h., um die Anforderung an den Dienst zu senden und die Antwort zu erhalten), müssen Sie den zurückgegebenen Mono abonnieren. Wenn Sie also mit diesen reaktiven Datenströmen arbeiten, müssen Sie daran denken, subscribe() aufzurufen, da nichts passiert, bis Sie dies tun.

Das folgende Beispiel zeigt, wie Sie sich für Mono anmelden und den Konfigurationswert auf der Konsole ausgeben.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Beachten Sie, dass nach dem Aufrufen getConfigurationSetting() des Clients der Beispielcode das Ergebnis abonniert und drei separate Lambdas bereitstellt. Die erste Lambda-Funktion verwendet Daten, die vom Dienst empfangen werden, was bei erfolgreicher Antwort ausgelöst wird. Die zweite Lambda-Funktion wird ausgelöst, wenn beim Abrufen der Konfiguration ein Fehler auftritt. Die dritte Lambda-Funktion wird aufgerufen, wenn der Datenstrom abgeschlossen ist, was bedeutet, dass von diesem Datenstrom keine weiteren Datenelemente erwartet werden.

Hinweis

Wie bei allen asynchronen Programmierungen wird die Ausführung nach der Erstellung des Abonnements wie gewohnt fortgesetzt. Wenn es nichts gibt, das das Programm aktiv hält und weiter ausführt, könnte es beendet werden, bevor der asynchrone Vorgang abgeschlossen ist. Der Hauptthread, der subscribe() aufgerufen hat, wartet nicht, bis Sie den Netzwerkanruf an Azure App Configuration tätigen und eine Antwort empfangen. In Produktionssystemen können Sie möglicherweise weiter etwas anderes verarbeiten, aber in diesem Beispiel können Sie eine kleine Verzögerung hinzufügen, indem Sie Thread.sleep() aufrufen oder CountDownLatch verwenden, um dem asynchronen Vorgang eine Chance zu geben, abgeschlossen zu werden.

Wie im folgenden Beispiel gezeigt, folgen APIs, die ein Flux zurückgeben, einem ähnlichen Muster. Der Unterschied besteht darin, dass der erste an die subscribe() Methode bereitgestellte Rückruf mehrmals für jedes Datenelement in der Antwort aufgerufen wird. Der Fehler oder die Abschlussrückrufe werden genau einmal aufgerufen und als Terminalsignale betrachtet. Es werden keine anderen Rückrufe mehr aufgerufen, wenn eines dieser Signale vom Herausgeber empfangen wird.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Rückdruck

Was geschieht, wenn die Quelle die Daten schneller erzeugt, als der Abonnent verarbeiten kann? Der Abonnent kann durch die Datenflut überfordert werden, was zu Speicherfehlern führen kann. Der Abonnent benötigt eine Möglichkeit für Rückmeldungen an den Herausgeber, um die Übertragung zu verlangsamen, wenn er nicht Schritt halten kann. Standardmäßig fordert der Abonnent, wenn Sie wie im vorherigen Beispiel gezeigt subscribe() für ein Flux aufrufen, einen unbegrenzten Datenstrom an und signalisiert dem Publisher damit, dass die Daten so schnell wie möglich gesendet werden sollen. Dieses Verhalten ist nicht immer wünschenswert, und der Abonnent muss möglicherweise die Veröffentlichungsrate mittels "Backpressure" steuern. Backpressure ermöglicht es dem Abonnenten, die Steuerung des Flusses von Datenelementen zu übernehmen. Ein Abonnent fordert eine begrenzte Anzahl von Datenelementen an, die er verarbeiten kann. Nachdem der Abonnent die Verarbeitung dieser Elemente abgeschlossen hat, kann er mehr anfordern. Mithilfe von Backpressure können Sie ein Pushmodell für die Datenübertragung in ein Push-Pull-Modell umwandeln.

Das folgende Beispiel zeigt, wie Sie die Rate steuern können, mit der Ereignisse vom Event Hubs-Consumer empfangen werden:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Wenn der Abonnent zuerst eine Verbindung mit dem Herausgeber herstellt, übergibt der Herausgeber eine Subscription Instanz, die den Status der Datenübertragung verwaltet. Dies Subscription ist das Medium, über das der Abonnent Backpressure anwenden kann, indem request() aufgerufen wird, um anzugeben, wie viele weitere Datenelemente der Abonnent verarbeiten kann.

Wenn der Abonnent bei jedem Aufruf von onNext() mehr als ein Datenelement anfordert, request(10) zum Beispiel, sendet der Herausgeber die nächsten 10 Elemente sofort, wenn sie verfügbar sind, oder sobald sie verfügbar werden. Diese Elemente sammeln sich in einem Puffer beim Abonnenten an, und da jeder onNext()-Aufruf 10 weitere anfordert, wächst der Rückstau weiter, bis entweder der Publisher keine weiteren Datenelemente mehr zu senden hat oder der Puffer des Abonnenten überläuft, was zu Out-of-Memory-Fehlern führt.

Kündigen eines reaktiven Streamabonnements

Ein Abonnement verwaltet den Status der Datenübertragung zwischen einem Herausgeber und einem Abonnenten. Das Abonnement bleibt aktiv, bis der Herausgeber die Übertragung aller Daten an den Abonnenten abgeschlossen hat oder der Abonnent nicht mehr an dem Empfang von Daten interessiert ist. Sie können ein Abonnement auf verschiedene Arten kündigen, wie in den folgenden Beispielen gezeigt.

Im folgenden Beispiel wird das Abonnement durch Beseitigen des Abonnenten gekündigt:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Im folgenden Beispiel wird das Abonnement durch das Aufrufen der cancel()-Methode für Subscription storniert:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Schlussfolgerung

Threads sind teure Ressourcen, die Sie nicht verschwenden sollten, um auf Antworten von Remotedienstaufrufen zu warten. Da sich die Akzeptanz von Microservices-Architekturen erhöht, wird die Notwendigkeit, Ressourcen effizient zu skalieren und zu nutzen, entscheidend. Asynchrone APIs sind günstig, wenn netzwerkgebundene Vorgänge vorhanden sind. Das Azure SDK für Java bietet eine vielzahl von APIs für asynchrone Vorgänge, um Ihre Systemressourcen zu maximieren. Probieren Sie die asynchronen Clients aus.

Weitere Informationen zu den Betreibern, die Ihren speziellen Aufgaben am besten entsprechen, finden Sie im Referenzleitfaden für Reaktor 3unter Welchen Operator benötige ich?

Nächste Schritte

Nachdem Sie nun die verschiedenen Konzepte der asynchronen Programmierung besser verstehen, ist es wichtig zu erfahren, wie die Ergebnisse durchlaufen werden. Weitere Informationen zu den besten Iterationsstrategien und Details zur Funktionsweise der Paginierung finden Sie unter Paginierung und Iteration im Azure SDK für Java.