Introducción: Compilación del primer modelo de aprendizaje automático en Databricks

En este cuaderno de ejemplo se muestra cómo entrenar un modelo de clasificación de aprendizaje automático en Databricks. Databricks Runtime para Machine Learning incluye muchas bibliotecas preinstaladas, como scikit-learn para algoritmos de entrenamiento y preprocesamiento, MLflow para realizar un seguimiento del proceso de desarrollo del modelo y Optuna para escalar el ajuste de hiperparámetros.

En este cuaderno, creará un modelo de clasificación para predecir si un vino se considera "de alta calidad". El conjunto de datos consta de 11 características de diferentes vinos (por ejemplo, contenido de alcohol, acidez y azúcar residual) y una clasificación de calidad del 1 al 10.

Esta tutorial abarca lo siguiente:

  • Parte 1: Entrenamiento de un modelo de clasificación con seguimiento de MLflow
  • Parte 2: Ajuste de hiperparámetros para mejorar el rendimiento del modelo
  • Parte 3: Guardar los resultados y los modelos en el catálogo de Unity
  • Parte 4: Implementación del modelo

Para obtener más información sobre la producción de aprendizaje automático en Databricks, incluida la administración del ciclo de vida del modelo y la inferencia del modelo, consulte el ejemplo de un extremo a otro de ML.

El conjunto de datos está disponible en UCI Machine Learning Repository y se presenta en Modeling wine preferences by data mining from physicochem properties [Cortez et al., 2009].

Requisitos

Configuración

En esta sección, hará lo siguiente:

  • Configure el cliente de MLflow para que use Unity Catalog como registro de modelos.
  • Establezca el catálogo y el esquema donde se registrará el modelo.
  • Lea los datos y guárdelo en tablas en el Catálogo de Unity.
  • Preprocese los datos.

Configuración del cliente de MLflow

De forma predeterminada, el cliente de MLflow Python crea modelos en el registro de modelos del área de trabajo de Databricks. Para guardar modelos en el catálogo de Unity, configure el cliente de MLflow como se muestra en la celda siguiente.

import mlflow
mlflow.set_registry_uri("databricks-uc")

La celda siguiente establece el catálogo y el esquema donde se registrará el modelo. Debe tener USE CATALOG privilegios en el catálogo y privilegios de USE_SCHEMA, CREATE_TABLE y CREATE_MODEL en el esquema. Cambie los nombres de catálogo y esquema en la celda siguiente si es necesario.

Para obtener más información, consulte la documentación del catálogo de Unity.

# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA, CREATE_TABLE, and CREATE_MODEL privileges on the schema.
# Change the catalog and schema here if necessary.
CATALOG_NAME = "main"
SCHEMA_NAME = "default"

Importar datos y guardarlos en las tablas del catálogo de Unity

El conjunto de datos está disponible en databricks-datasets. En la celda siguiente, importará los datos de los .csv archivos en DataFrames de Spark. A continuación, escriba los DataFrames en tablas en el Unity Catalog. Esto conserva los datos y le permite controlar cómo compartirlos con otros usuarios.

white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)

# Remove the spaces from the column names
for c in white_wine.columns:
    white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
    red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))

# Define table names
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"

# Write to tables in Unity Catalog
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")

Preprocesar datos

# Import required libraries
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble

import matplotlib.pyplot as plt

import optuna
from mlflow.optuna.storage import MlflowStorage
from mlflow.pyspark.optuna.study import MlflowSparkStudy
# Load data from Unity Catalog as Pandas dataframes
white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()

# Add Boolean fields for red and white wine
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)

# Define classification labels based on the wine quality
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)

# Split 80/20 train-test
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
  data_df,
  data_labels,
  test_size=0.2,
  random_state=1
)

Parte 1. Entrenamiento de un modelo de clasificación

# Enable MLflow autologging for this notebook
mlflow.autolog()

A continuación, entrene un clasificador dentro del contexto de una ejecución de MLflow, que registra automáticamente el modelo entrenado y muchos parámetros y métricas asociados.

Puede complementar el registro con métricas adicionales, como la puntuación de AUC del modelo en el conjunto de datos de prueba.

with mlflow.start_run(run_name='gradient_boost') as run:
    model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)

    # Models, parameters, and training metrics are tracked automatically
    model.fit(X_train, y_train)

    predicted_probs = model.predict_proba(X_test)
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)

    # Save the ROC curve plot to a file
    roc_curve.figure_.savefig("roc_curve.png")

    # The AUC score on test data is not automatically logged, so log it manually
    mlflow.log_metric("test_auc", roc_auc)

    # Log the ROC curve image file as an artifact
    mlflow.log_artifact("roc_curve.png")

    print("Test AUC of: {}".format(roc_auc))

Visualización de ejecuciones de MLflow

Para ver la ejecución de entrenamiento registrada, haga clic en el icono Experimento que se encuentra en la esquina superior derecha del cuaderno para mostrar la barra lateral del experimento. Si es necesario, haga clic en el icono de actualización para capturar y supervisar las ejecuciones más recientes.

Experimentos enumerados en la barra lateral derecha

Para mostrar la página del experimento de MLflow más detallada, haga clic en el icono de la página del experimento. Esta página permite comparar ejecuciones y ver detalles de ejecuciones específicas. Consulte Seguimiento del desarrollo de modelos mediante MLflow.

Carga de modelos

También puede acceder a los resultados de una ejecución específica mediante la API de MLflow. El código de la celda siguiente muestra cómo cargar el modelo entrenado en una ejecución de MLflow determinada y usarlo para realizar predicciones. También puede encontrar fragmentos de código para cargar modelos específicos en la página de ejecución de MLflow.

# After a model has been logged, you can load it in different notebooks or jobs
# mlflow.pyfunc.load_model makes model prediction available under a common API
model_loaded = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=run.info.run_id
  )
)

predictions_loaded = model_loaded.predict(X_test)
predictions_original = model.predict(X_test)

# The loaded model should match the original
assert(np.array_equal(predictions_loaded, predictions_original))

Parte 2. Ajuste de hiperparámetros

En este momento, ha entrenado un modelo sencillo y ha usado el servicio de seguimiento de MLflow para organizar el trabajo. A continuación, puede realizar un ajuste más sofisticado mediante Optuna.

Entrenamiento paralelo mediante Optuna

Optuna es una biblioteca de código abierto Python para el ajuste de hiperparámetros que se puede escalar horizontalmente entre varios recursos de proceso. Para obtener más información sobre el uso de Optuna en Databricks, consulte Ajuste de Hiperparámetros con Optuna.

def objective(trial):
  # Enable autologging on each worker
  mlflow.autolog()
  with mlflow.start_run(nested=True):
    params = {
      'n_estimators': trial.suggest_int('n_estimators', 20, 1000),
      'learning_rate': trial.suggest_float('learning_rate', 0.05, 1.0, log=True),
      'max_depth': trial.suggest_int('max_depth', 2, 5),
    }
    model_hp = sklearn.ensemble.GradientBoostingClassifier(
      random_state=0,
      **params
    )
    model_hp.fit(X_train, y_train)
    predicted_probs = model_hp.predict_proba(X_test)
    # Tune based on the test AUC
    # In production, you could use a separate validation set instead
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    mlflow.log_metric('test_auc', roc_auc)

    # Negate the AUC because Optuna minimizes the objective by default
    return -roc_auc


with mlflow.start_run(run_name='gb_optuna') as run:
  # Use the MLflow Tracking Server as the Optuna storage backend
  experiment_id = mlflow.active_run().info.experiment_id
  mlflow_storage = MlflowStorage(experiment_id=experiment_id)

  # MlflowSparkStudy distributes the tuning using Spark workers
  mlflow_study = MlflowSparkStudy(
    study_name="gb-optuna-tuning",
    storage=mlflow_storage,
  )

  mlflow_study.optimize(objective, n_trials=32, n_jobs=4)

Ejecutar búsquedas para obtener el mejor modelo

Dado que MLflow realiza un seguimiento de todas las ejecuciones, puede recuperar las métricas y los parámetros para la mejor ejecución mediante la API de ejecuciones de búsqueda de MLflow para encontrar la ejecución de optimización con el auc de prueba más alto.

Este modelo optimizado debe funcionar mejor que los modelos más sencillos entrenados en la parte 1.

# Sort runs by their test auc. In case of ties, use the most recent run.
best_run = mlflow.search_runs(
  order_by=['metrics.test_auc DESC', 'start_time DESC'],
  max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))

best_model_pyfunc = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )
)

# Make a dataset with all predictions
best_model_predictions = X_test
best_model_predictions["prediction"] = best_model_pyfunc.predict(X_test)

Parte 3. Guardar los resultados y los modelos en el catálogo de Unity

predictions_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions"
spark.sql(f"DROP TABLE IF EXISTS {predictions_table}")

results = spark.createDataFrame(best_model_predictions)

# Write results back to Unity Catalog from Python
results.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions")
model_uri = 'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )

mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model")

Parte 4. Implementación de un modelo

Después de guardar su modelo en Unity Catalog, puede implementarlo mediante la interfaz de usuario de servicio. Las instrucciones siguientes proporcionan una breve descripción. Para más información, consulte Creación de puntos de conexión de servicio de modelos personalizados.

  1. Haga clic en Serving en la barra lateral para mostrar la interfaz de usuario de Serving.

Interfaz de usuario de servicio de modelos

  1. Haga clic en Crear punto de conexión de servicio.

  2. En el campo Nombre, proporcione un nombre para el punto de conexión.

  3. En la sección Entidades atendidas

    1. Haga clic en el campo Entidad para abrir el formulario Seleccionar entidad atendida.
    2. Seleccione Mis modelos: Catálogo de Unity. El formulario se actualiza dinámicamente en función de la selección.
    3. Seleccione el wine_quality_model y la versión de modelo que desea servir.
    4. Seleccione 100 como porcentaje de tráfico que desea enrutar al modelo servido.
    5. Seleccione CPU como tipo de proceso para este ejemplo.
    6. En Escalado horizontal de proceso, seleccione Pequeño como tamaño de escalado horizontal de proceso.
  4. Haga clic en Crear. La página de Puntos de servicio aparece con el estado del punto de servicio mostrado como No listo.

  5. Cuando el punto de conexión esté listo, seleccione Usar para enviar una solicitud de inferencia al punto de conexión.

Cuaderno de ejemplo

Introducción: Compilación del primer modelo de aprendizaje automático en Databricks

Obtención del cuaderno