Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här artikeln beskriver den asynkrona programmeringsmodellen i Azure SDKs för Java.
Azure SDKs innehöll inledningsvis endast icke-blockerande, asynkrona API:er för interaktion med Azure-tjänster. Med dessa API:er kan du använda Azure SDKs för att skapa skalbara program som använder systemresurser effektivt. Men Azure SDKs för Java innehåller även synkrona klienter för att tillgodose en bredare målgrupp och göra klientbibliotek lättillgängliga för användare som inte är bekanta med asynkron programmering. (Se Approachable in the Azure SDKs design guidelines.) Därför erbjuder alla Java-klientbibliotek i Azure SDKs för Java både asynkrona och synkrona klienter. Använd dock de asynkrona klienterna för produktionssystem för att maximera användningen av systemresurser.
Reaktiva strömmar i Azure SDKs för Java
Om du tittar på avsnittet Async Service Clients i avsnittet Java Azure SDKs Design Guidelines, ser du att i stället för att använda CompletableFuture som tillhandahålls av Java 8 använder asynkrona API:er reaktiva typer. Varför valde Azure SDKs-teamet reaktiva typer framför typer som är inbyggda i JDK?
Java 8 introducerade funktioner som Streams, Lambdas och CompletableFuture. De här funktionerna har många funktioner, men de har vissa begränsningar.
CompletableFuture tillhandahåller återanropsbaserade, icke-blockerande funktioner, och CompletionStage gränssnittet gör det enkelt att skapa en serie asynkrona åtgärder. Lambdas gör dessa push-baserade API:er mer läsbara. Strömmar tillhandahåller åtgärder i funktionell stil för att hantera en samling dataelement. Strömmar är dock synkrona och kan inte återanvändas.
CompletableFuture gör att du kan göra en enda begäran, ger stöd för ett återanrop och förväntar dig ett enda svar. Många molntjänster kräver dock möjligheten att strömma data – till exempel Event Hubs.
Reaktiva strömmar kan hjälpa dig att övervinna dessa begränsningar genom att strömma element från en källa till en prenumerant. När en prenumerant begär data från en källa skickar källan tillbaka valfritt antal resultat. Det behöver inte skicka alla på en gång. Överföringen sker under en tidsperiod, när källan har data att skicka.
I den här modellen registrerar prenumeranten händelsehanterare för att bearbeta data när de tas emot. Dessa push-baserade interaktioner meddelar prenumeranten via distinkta signaler:
- Ett
onSubscribe()anrop anger att dataöverföringen är på väg att påbörjas. - Ett
onError()anrop anger att det fanns ett fel som också markerar slutet på dataöverföringen. - Ett
onComplete()anrop anger att dataöverföringen har slutförts.
Till skillnad från Java Streams behandlar reaktiva strömmar fel som förstklassiga händelser. Reaktiva strömmar har en dedikerad kanal där källan kan meddela eventuella fel till prenumeranten. Dessutom kan reaktiva strömmar göra det möjligt för prenumeranten att förhandla om dataöverföringshastigheten för att omvandla dessa strömmar till en push-pull-modell.
Reactive Streams-specifikationen tillhandahåller en standard för hur dataöverföring ska ske. På hög nivå definierar specifikationen följande fyra gränssnitt och anger regler för hur dessa gränssnitt ska implementeras.
- Publisher är källan till en dataström.
- Prenumerant är konsument av en dataström.
- Prenumerationen hanterar tillståndet för dataöverföring mellan en utgivare och en prenumerant.
- Processorn är både utgivare och prenumerant.
Vissa välkända Java bibliotek tillhandahåller implementeringar av den här specifikationen, till exempel RxJava, Akka Streams, Vert.x och Project Reactor.
Azure SDKs för Java antog Project Reactor för att erbjuda sina asynkrona API:er. Den viktigaste faktorn som drev detta beslut var att tillhandahålla en smidig integrering med Spring Webflux, som också använder Project Reactor. En annan bidragande faktor att välja Project Reactor framför RxJava var att Project Reactor använder Java 8 men RxJava, vid den tiden, var fortfarande på Java 7. Project Reactor erbjuder också en omfattande uppsättning operatorer som är komponerbara och gör att du kan skriva deklarativ kod för att skapa databearbetningskedjor. En annan trevlig sak med Project Reactor är att den har adaptrar för att konvertera Project Reactor-typer till andra populära implementeringstyper.
Jämför API:er för synkrona och asynkrona åtgärder
Du har lärt dig om synkrona klienter och alternativ för asynkrona klienter. I följande tabell sammanfattas hur API:er ser ut som är utformade med hjälp av följande alternativ:
| API-typ | Inget värde | Enskilt värde | Flera värden |
|---|---|---|---|
| Standard Java – synkrona API:er | void |
T |
Iterable<T> |
| Standard Java – Asynkrona API:er | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
| Reaktiva Streams-gränssnitt | Publisher<Void> |
Publisher<T> |
Publisher<T> |
| Implementering av reaktiva strömmar i Project Reactor | Mono<Void> |
Mono<T> |
Flux<T> |
För fullständighetens skull är det värt att nämna att Java 9 introducerade klassen Flow som innehåller de fyra reaktiva strömmarnas gränssnitt. Den här klassen innehåller dock ingen implementering.
Använda asynkrona API:er i Azure SDKs för Java
Specifikationen för reaktiva strömmar skiljer inte mellan olika typer av utgivare. I specifikationen för reaktiva strömmar skapar utgivare helt enkelt noll eller fler dataelement. I många fall finns det en användbar skillnad mellan en utgivare som producerar högst ett dataelement jämfört med ett som producerar noll eller mer. I molnbaserade API:er anger den här skillnaden om en begäran returnerar ett svar med en enda värde eller en samling. Project Reactor tillhandahåller två typer för att göra denna skillnad – Mono och Flux. Ett API som returnerar ett Mono innehåller ett svar som har högst ett värde, och ett API som returnerar ett Flux innehåller ett svar som har noll eller fler värden.
Anta till exempel att du använder en ConfigurationAsyncClient för att hämta en konfiguration som lagras med azure appkonfigurationstjänsten. (Mer information finns i Vad är Azure App Configuration?.)
Om du skapar ett ConfigurationAsyncClient och anropar getConfigurationSetting() klienten returneras ett Mono, som anger att svaret innehåller ett enda värde. Men att anropa den här metoden ensam gör ingenting. Klienten har ännu inte gjort någon begäran till Azure App Configuration-tjänsten. I det här skedet är det som returneras av det här API:et Mono<ConfigurationSetting> bara en "sammansättning" av databearbetningspipelinen. Den här arkitekturen innebär att den nödvändiga konfigurationen för att använda data är klar. Om du vill utlösa dataöverföringen (dvs. för att göra begäran till tjänsten och få svaret) måste du prenumerera på den returnerade Mono. Så när du hanterar dessa reaktiva strömmar måste du komma ihåg att ringa subscribe() eftersom ingenting händer förrän du gör det.
I följande exempel visas hur du prenumererar på Mono och skriver ut konfigurationsvärdet till konsolen.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
Observera att efter att ha anropat getConfigurationSetting() på klienten, prenumererar exempelkoden på resultatet och tillhandahåller tre separata lambdas. Den första lambda använder data som tagits emot från tjänsten, vilket utlöses vid lyckat svar. Den andra lambda utlöses om det uppstår ett fel när konfigurationen hämtas. Den tredje lambda anropas när dataströmmen är klar, vilket innebär att inga fler dataelement förväntas från den här dataströmmen.
Anmärkning
Precis som med all asynkron programmering fortsätter exekveringen som vanligt när prenumerationen har skapats. Om det inte finns något som gör att programmet är aktivt och körs kan det avslutas innan asynkroniseringsåtgärden slutförs. Huvudtråden som anropade subscribe() väntar inte tills du gör nätverksanropet till Azure App Configuration och får ett svar. I produktionssystem kan du fortsätta att bearbeta något annat, men i det här exemplet kan du lägga till en liten fördröjning genom att anropa Thread.sleep() eller använda en CountDownLatch för att ge asynkron åtgärden en chans att slutföras.
Som du ser i följande exempel följer ÄVEN API:er som returnerar ett Flux liknande mönster. Skillnaden är att den första callback som tillhandahålls till metoden subscribe() anropas flera gånger för varje dataelement i svaret. Felet eller slutförandeåteranropen kallas exakt en gång och betraktas som en terminal signal. Inga andra återanrop anropas om någon av dessa signaler tas emot från utgivaren.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Ryggtryck
Vad händer när källan producerar data snabbare än vad prenumeranten kan hantera? Prenumeranten kan bli överbelastad med data, vilket kan leda till minnesfel. Prenumeranten behöver ett sätt att kommunicera tillbaka till utgivaren för att sakta ner när den inte kan hänga med. Som standard, när du, som i det tidigare exemplet, anropar subscribe() på en Flux, begär prenumeranten en obegränsad dataström, vilket signalerar till utgivaren att skicka data så snabbt som möjligt. Det här beteendet är inte alltid önskvärt och prenumeranten kan behöva kontrollera publiceringshastigheten via "backpressure". Med backpressure kan prenumeranten ta kontroll över flödet av dataelement. En prenumerant begär ett begränsat antal dataelement som den kan hantera. När prenumeranten har bearbetat dessa element kan den begära mer. Genom att använda backpressure kan du omvandla en push-modell för dataöverföring till en push-pull-modell.
I följande exempel visas hur du kan styra hur snabbt händelser tas emot av Event Hubs-konsumenten:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
När prenumeranten först "ansluter" till utgivaren ger utgivaren prenumeranten en Subscription instans som hanterar dataöverföringens tillstånd. Det här Subscription är mediet genom vilket prenumeranten kan använda backpressure genom att anropa request() för att ange hur många fler dataelement den kan hantera.
Om prenumeranten begär mer än ett dataelement varje gång den anropar onNext(), request(10) skickar utgivaren till exempel de följande 10 elementen omedelbart om de är tillgängliga eller när de blir tillgängliga. Dessa element samlas i en buffert hos prenumeranten, och eftersom varje onNext()-anrop begär 10 element till fortsätter kön att växa tills antingen publiceraren inte har fler dataelement att skicka eller prenumerantens buffert svämmar över, vilket leder till slut-på-minnet-fel.
Avbryta en reaktiv stream-prenumeration
En prenumeration hanterar tillståndet för dataöverföring mellan en utgivare och en prenumerant. Prenumerationen förblir aktiv tills utgivaren har överfört alla data till prenumeranten eller så är prenumeranten inte längre intresserad av att ta emot data. Du kan avbryta en prenumeration på ett par sätt, som du ser i följande exempel.
I följande exempel avslutas prenumerationen genom att abonnenten tas bort.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
Följande exempel avbryter prenumerationen genom att anropa cancel() metoden på Subscription:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Slutsats
Trådar är dyra resurser som du inte bör slösa på att vänta på svar från fjärrtjänstsamtal. I takt med att införandet av mikrotjänstarkitekturer ökar blir behovet av att skala och använda resurser effektivt avgörande. Asynkrona API:er är gynnsamma när det finns nätverksbundna åtgärder. Azure SDKs för Java erbjuder en omfattande uppsättning API:er för asynkrona åtgärder för att maximera systemresurserna. Prova asynkrona klienter.
Mer information om de operatorer som bäst passar just dina uppgifter finns i Vilken operatör behöver jag? i referensguiden för Reactor 3.
Nästa steg
Nu när du bättre förstår de olika asynkrona programmeringsbegreppen är det viktigt att lära dig att iterera över resultaten. Mer information om de bästa iterationsstrategierna och information om hur sidnumrering fungerar finns i Sidnumrering och iteration i Azure SDKs för Java.