Läs- och skrivprotokollbuffertar

Protocol Buffers (protobuf) är ett språkneutralt binärt serialiseringsformat som utvecklats av Google. Azure Databricks användare stöter oftast på det när de bearbetar binärkodade poster från händelseströmningssystem som Apache Kafka. Azure Databricks har stöd för att läsa och skriva protobuf-data med Apache Spark genom from_protobuf och to_protobuf-funktionerna, som konverterar mellan binära protobuf-data och structtyper i Spark SQL för arbetsbelastningar för både strömning och batch.

Förutsättningar

Protobuf-funktioner kräver Databricks Runtime 12.2 LTS och senare.

Funktionssyntax

Använd from_protobuf för att omvandla en binär kolumn till en struct och to_protobuf för att omvandla en structkolumn till binär. Du måste ange antingen en beskrivande fil som identifieras av descFilePath argumentet eller ett schemaregister som anges med options argumentet. En fullständig lista över alternativ finns i Protobuf.

python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Options

Skicka alternativ till from_protobuf och to_protobuf med argumentet options . En fullständig lista över alternativ som stöds finns i Protobuf.

Alternativ för Schema Registry

Följande alternativ är specifika för schemaregisteranvändning och omfattas inte av referensen för allmänna alternativ.

Option Obligatoriskt Förinställning Description
schema.registry.schema.evolution.mode No "restart" Hur schemaändringar hanteras när ett nyare schema-ID identifieras i en inkommande post. "restart" avslutar frågan med en UnknownFieldException; konfigurera jobb så att de startas om vid fel för att fånga upp ändringar. "none" ignorerar schema-ID-ändringar och parsar nyare poster med det ursprungliga schemat.
confluent.schema.registry.<option> No Ange valfritt Confluent Schema Registry-klientalternativ med prefixet "confluent.schema.registry". Ställ till exempel in "confluent.schema.registry.basic.auth.credentials.source""USER_INFO" och "confluent.schema.registry.basic.auth.user.info""<KEY>:<SECRET>" för att konfigurera grundläggande autentisering.

Usage

I följande exempel används Wanderbricks-datauppsättningen för att demonstrera serialisering av Apache Spark-structs till binära protobuf med to_protobuf() och deserialisera binära protobuf-poster med from_protobuf().

Använd protobuf med Confluent Schema Registry

Azure Databricks stöder användning av Confluent Schema Registry för att definiera Protobuf.

python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)

Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
        .as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
        .as("proto_event")
)
reviewsRestoredDF.show()

Autentisera till ett externt Confluent-schemaregister

Om du vill autentisera till ett externt Confluent Schema Registry uppdaterar du dina schemaregisteralternativ så att de innehåller autentiseringsuppgifter och API-nycklar.

python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Använd truststore- och keystore-filer i Unity Catalog-volymer

I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalog-volymer för att autentisera till ett Confluent-schemaregister. Uppdatera schemaregisteralternativen enligt följande exempel:

python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Använda Protobuf med en beskrivningsfil

Du kan också referera till en protobuf-beskrivningsfil som är tillgänglig för ditt beräkningskluster. Kontrollera att du har rätt behörighet att läsa filen, beroende på var den finns.

python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

descriptor_file = "/path/to/proto_descriptor.desc"

# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)

Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct

val descriptorFile = "/path/to/proto_descriptor.desc"

// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()

Ytterligare resurser

  • Läsa och skriva Avro-data i dataströmmar: Om din arbetsbelastning för dataströmmar använder Avro-serialisering i stället för Protobuf, se Avro-strömningsfunktionerna för motsvarande funktioner from_avro och to_avro.