Lire et écrire des Protocol Buffers

Les mémoires tampons de protocole (protobuf) sont un format de sérialisation binaire neutre en langage développé par Google. Les utilisateurs d’Azure Databricks y sont le plus souvent confrontés lors du traitement d’enregistrements encodés en binaire provenant de systèmes de diffusion d’événements en continu tels qu’Apache Kafka. Azure Databricks prend en charge la lecture et l’écriture de données Protobuf avec Apache Spark via les fonctions from_protobuf et to_protobuf, qui convertissent les données entre le format binaire Protobuf et les types struct de Spark SQL, aussi bien pour les charges de travail en streaming que par lots.

Prerequisites

Les fonctions Protobuf nécessitent Databricks Runtime 12.2 LTS et versions ultérieures.

Syntaxe de la fonction

Utilisez from_protobuf pour convertir une colonne binaire en structure, et to_protobuf pour convertir une colonne de structure en binaire. Vous devez fournir un fichier descripteur identifié par l’argument descFilePath ou un registre de schémas spécifié avec l’argument options . Pour obtenir la liste complète des options, consultez Protobuf.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Langage de programmation Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Options

Transmettez des options à from_protobuf et à to_protobuf à l’aide de l’argument options. Pour obtenir la liste complète des options prises en charge, consultez Protobuf.

Options du Registre de schémas

Les options suivantes sont spécifiques à l’utilisation du registre de schémas et ne sont pas couvertes dans la référence des options générales.

Option Requis Par défaut Description
schema.registry.schema.evolution.mode Non "restart" Comment les modifications de schéma sont gérées lorsqu’un ID de schéma plus récent est détecté dans un enregistrement entrant. "restart" met fin à la requête avec un UnknownFieldException; configurez les travaux pour redémarrer en cas de défaillance de la prise en charge des modifications. "none" ignore les modifications de l’ID de schéma et analyse les enregistrements plus récents avec le schéma d’origine.
confluent.schema.registry.<option> Non Spécifiez n’importe quelle option du client du Registre de schémas Confluent à l’aide du préfixe "confluent.schema.registry". Par exemple, définissez "confluent.schema.registry.basic.auth.credentials.source" sur "USER_INFO" et "confluent.schema.registry.basic.auth.user.info" sur "<KEY>:<SECRET>" pour configurer l’authentification de base.

Usage

Les exemples suivants utilisent le jeu de données Wanderbricks pour illustrer la sérialisation des structs Apache Spark en protobuf binaire avec to_protobuf() et désérialisant les enregistrements protobuf binaires avec from_protobuf().

Utiliser protobuf avec le registre de schémas Confluent

Azure Databricks prend en charge l’utilisation du Confluent Schema Registry pour définir Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)

Langage de programmation Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
        .as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
        .as("proto_event")
)
reviewsRestoredDF.show()

S'authentifier à un registre externe de schémas Confluent

Pour vous authentifier auprès d’un registre de schémas Confluent externe, mettez à jour les options du registre de schémas afin d’inclure les clés API et les informations d’identification pour l’authentification.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Langage de programmation Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Utiliser les fichiers truststore et keystore (magasin de clés) dans les volumes Unity Catalog

Dans Databricks Runtime 14.3 LTS et versions ultérieures, vous pouvez utiliser des fichiers truststore et keystore dans les volumes Unity Catalog pour vous authentifier auprès de Confluent Schema Registry. Mettez à jour les options de votre registre de schémas en fonction de l’exemple suivant :

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Langage de programmation Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Utiliser protobuf avec un fichier descripteur

Vous pouvez également référencer un fichier de descripteur protobuf qui est disponible pour votre cluster de calcul. Vérifiez que vous disposez des autorisations appropriées pour lire le fichier, en fonction de son emplacement.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

descriptor_file = "/path/to/proto_descriptor.desc"

# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)

Langage de programmation Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct

val descriptorFile = "/path/to/proto_descriptor.desc"

// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()

Ressources additionnelles

  • Lire et écrire des données Avro en streaming : si votre charge de travail de streaming utilise la sérialisation Avro plutôt que Protobuf, consultez les fonctions de streaming Avro pour les fonctions from_avro et to_avro équivalentes.