Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Protocolbuffers (protobuf) is een taalneutraal binaire serialisatie-indeling die is ontwikkeld door Google. Azure Databricks-gebruikers komen dit het vaakst tegen bij het verwerken van binair gecodeerde records uit eventstreamingsystemen zoals Apache Kafka. Azure Databricks ondersteunt het lezen en schrijven van protobuf-gegevens met Apache Spark via de from_protobuf en to_protobuf functies, die worden geconverteerd tussen binaire protobuf- en Spark SQL-structtypen voor zowel streaming- als batchworkloads.
Prerequisites
Protobuf-functies vereisen Databricks Runtime 12.2 LTS en hoger.
Functie-syntaxis
Gebruik from_protobuf om een binaire kolom naar een struct om te zetten, en to_protobuf om een struct-kolom naar binaire gegevens om te zetten. U moet een descriptorbestand opgeven dat is geïdentificeerd door het descFilePath argument of een schemaregister dat is opgegeven met het options argument. Zie Protobuf voor een volledige lijst met opties.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Options
Geef opties door aan from_protobuf en to_protobuf, met behulp van het options-argument. Zie Protobuf voor een volledige lijst met ondersteunde opties.
Opties voor het schemaregister
De volgende opties zijn specifiek voor het gebruik van schemaregisters en worden niet behandeld in de algemene verwijzing naar opties.
| Option | Vereist | Default | Beschrijving |
|---|---|---|---|
schema.registry.schema.evolution.mode |
No | "restart" |
Hoe schemawijzigingen worden verwerkt wanneer een nieuwere schema-id wordt gedetecteerd in een binnenkomende record.
"restart" beëindigt de query met een UnknownFieldException; configureer taken om opnieuw op te starten bij het niet ophalen van wijzigingen.
"none" negeert wijzigingen in schema-id's en parseert nieuwere records met het oorspronkelijke schema. |
confluent.schema.registry.<option> |
No | — | Geef een Confluent Schema Registry-clientoptie door met behulp van het voorvoegsel "confluent.schema.registry". Stel "confluent.schema.registry.basic.auth.credentials.source" bijvoorbeeld in op "USER_INFO" en "confluent.schema.registry.basic.auth.user.info" op "<KEY>:<SECRET>" om basisauthenticatie te configureren. |
Usage
In de volgende voorbeelden wordt de Wanderbricks-dataset gebruikt om te laten zien hoe Apache Spark-structs met to_protobuf() naar binaire protobuf worden geserialiseerd en hoe binaire protobuf-records met from_protobuf() worden gedeserialiseerd.
Protobuf gebruiken met Confluent Schema Registry
Azure Databricks biedt ondersteuning voor het gebruik van het Confluent-schemaregister om Protobuf te definiëren.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)
Scala
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
reviewsRestoredDF.show()
Authenticeren bij een extern Confluent-schemaregister
Als u wilt verifiëren bij een extern Confluent-schemaregister, moet u de opties voor het schemaregister bijwerken om verificatiereferenties en API-sleutels op te nemen.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
TrustStore- en keystore-bestanden gebruiken in Unity Catalog-volumes
In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalog-volumes gebruiken om te verifiëren bij een Confluent-schemaregister. Werk de schemaregisteropties bij volgens het volgende voorbeeld:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Protobuf gebruiken met een descriptorbestand
U kunt ook verwijzen naar een protobuf descriptorbestand dat beschikbaar is voor uw rekencluster. Zorg ervoor dat u over de juiste machtigingen beschikt om het bestand te lezen, afhankelijk van de locatie.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
descriptor_file = "/path/to/proto_descriptor.desc"
# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)
Scala
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
val descriptorFile = "/path/to/proto_descriptor.desc"
// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()
Aanvullende bronnen
-
Avro-streaminggegevens lezen en schrijven: als uw streamingworkload gebruikmaakt van Avro-serialisatie in plaats van Protobuf, raadpleegt u de Avro-streamingfuncties voor de equivalente
from_avrofuncties ento_avrofuncties.