Protocolbuffers lezen en schrijven

Protocolbuffers (protobuf) is een taalneutraal binaire serialisatie-indeling die is ontwikkeld door Google. Azure Databricks-gebruikers komen dit het vaakst tegen bij het verwerken van binair gecodeerde records uit eventstreamingsystemen zoals Apache Kafka. Azure Databricks ondersteunt het lezen en schrijven van protobuf-gegevens met Apache Spark via de from_protobuf en to_protobuf functies, die worden geconverteerd tussen binaire protobuf- en Spark SQL-structtypen voor zowel streaming- als batchworkloads.

Prerequisites

Protobuf-functies vereisen Databricks Runtime 12.2 LTS en hoger.

Functie-syntaxis

Gebruik from_protobuf om een binaire kolom naar een struct om te zetten, en to_protobuf om een struct-kolom naar binaire gegevens om te zetten. U moet een descriptorbestand opgeven dat is geïdentificeerd door het descFilePath argument of een schemaregister dat is opgegeven met het options argument. Zie Protobuf voor een volledige lijst met opties.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Options

Geef opties door aan from_protobuf en to_protobuf, met behulp van het options-argument. Zie Protobuf voor een volledige lijst met ondersteunde opties.

Opties voor het schemaregister

De volgende opties zijn specifiek voor het gebruik van schemaregisters en worden niet behandeld in de algemene verwijzing naar opties.

Option Vereist Default Beschrijving
schema.registry.schema.evolution.mode No "restart" Hoe schemawijzigingen worden verwerkt wanneer een nieuwere schema-id wordt gedetecteerd in een binnenkomende record. "restart" beëindigt de query met een UnknownFieldException; configureer taken om opnieuw op te starten bij het niet ophalen van wijzigingen. "none" negeert wijzigingen in schema-id's en parseert nieuwere records met het oorspronkelijke schema.
confluent.schema.registry.<option> No Geef een Confluent Schema Registry-clientoptie door met behulp van het voorvoegsel "confluent.schema.registry". Stel "confluent.schema.registry.basic.auth.credentials.source" bijvoorbeeld in op "USER_INFO" en "confluent.schema.registry.basic.auth.user.info" op "<KEY>:<SECRET>" om basisauthenticatie te configureren.

Usage

In de volgende voorbeelden wordt de Wanderbricks-dataset gebruikt om te laten zien hoe Apache Spark-structs met to_protobuf() naar binaire protobuf worden geserialiseerd en hoe binaire protobuf-records met from_protobuf() worden gedeserialiseerd.

Protobuf gebruiken met Confluent Schema Registry

Azure Databricks biedt ondersteuning voor het gebruik van het Confluent-schemaregister om Protobuf te definiëren.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)

Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
        .as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
        .as("proto_event")
)
reviewsRestoredDF.show()

Authenticeren bij een extern Confluent-schemaregister

Als u wilt verifiëren bij een extern Confluent-schemaregister, moet u de opties voor het schemaregister bijwerken om verificatiereferenties en API-sleutels op te nemen.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

TrustStore- en keystore-bestanden gebruiken in Unity Catalog-volumes

In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalog-volumes gebruiken om te verifiëren bij een Confluent-schemaregister. Werk de schemaregisteropties bij volgens het volgende voorbeeld:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Protobuf gebruiken met een descriptorbestand

U kunt ook verwijzen naar een protobuf descriptorbestand dat beschikbaar is voor uw rekencluster. Zorg ervoor dat u over de juiste machtigingen beschikt om het bestand te lezen, afhankelijk van de locatie.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

descriptor_file = "/path/to/proto_descriptor.desc"

# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)

Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct

val descriptorFile = "/path/to/proto_descriptor.desc"

// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()

Aanvullende bronnen

  • Avro-streaminggegevens lezen en schrijven: als uw streamingworkload gebruikmaakt van Avro-serialisatie in plaats van Protobuf, raadpleegt u de Avro-streamingfuncties voor de equivalente from_avro functies en to_avro functies.