Introduzione: Creare il primo modello di Machine Learning in Databricks

Questo notebook di esempio illustra come eseguire il training di un modello di classificazione di Machine Learning in Databricks. Databricks Runtime per Machine Learning include molte librerie preinstallate, tra cui scikit-learn per algoritmi di training e pre-elaborazione, MLflow per tenere traccia del processo di sviluppo del modello e Optuna per ridimensionare l'ottimizzazione degli iperparametri.

In questo notebook si crea un modello di classificazione per stimare se un vino è considerato "di alta qualità". Il set di dati è costituito da 11 caratteristiche di vini diversi (ad esempio, contenuto di alcol, acidità e zucchero residuo) e una classificazione di qualità compresa tra 1 e 10.

Questa esercitazione tratta:

  • Parte 1: Addestrare un modello di classificazione con il tracciamento di MLflow
  • Parte 2: Ottimizzazione degli iperparametri per migliorare le prestazioni del modello
  • Parte 3: Salvare i risultati e i modelli in Unity Catalog
  • Parte 4: Distribuire il modello

Per altri dettagli sulla produzione di Machine Learning in Databricks, tra cui la gestione del ciclo di vita del modello e l'inferenza del modello, vedere l'esempio end-to-end di ML.

Il set di dati è disponibile dal repository UCI Machine Learning e viene presentato in "Modellazione delle preferenze dei vini tramite data mining dalle proprietà fisico-chimiche" [Cortez et al., 2009].

Requisiti

Setup

In questa sezione vengono eseguite le operazioni seguenti:

  • Configurare il client MLflow per l'uso di Unity Catalog come registro dei modelli.
  • Impostare il catalogo e lo schema in cui verrà registrato il modello.
  • Leggere i dati e salvarli nelle tabelle in Unity Catalog.
  • Preprocessare i dati.

Configurare il client MLflow

Per impostazione predefinita, il client MLflow Python crea modelli nel registro dei modelli dell'area di lavoro di Databricks. Per salvare i modelli in Unity Catalog, configurare il client MLflow come illustrato nella cella seguente.

import mlflow
mlflow.set_registry_uri("databricks-uc")

La cella seguente imposta il catalogo e lo schema in cui verrà registrato il modello. È necessario avere USE CATALOG privilegi per il catalogo e USE_SCHEMA, CREATE_TABLE e CREATE_MODEL privilegi per lo schema. Se necessario, modificare i nomi del catalogo e dello schema nella cella seguente.

Per altre informazioni, vedere la documentazione di Unity Catalog.

# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA, CREATE_TABLE, and CREATE_MODEL privileges on the schema.
# Change the catalog and schema here if necessary.
CATALOG_NAME = "main"
SCHEMA_NAME = "default"

Leggere i dati e salvarli nelle tabelle in Unity Catalog

Il set di dati è disponibile in databricks-datasets. Nella cella seguente leggi i dati dai file .csv nei DataFrame di Spark. I dataframe vengono quindi scritti nelle tabelle in Unity Catalog. Questo rende persistenti i dati e consente di controllare come condividerli con altri utenti.

white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)

# Remove the spaces from the column names
for c in white_wine.columns:
    white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
    red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))

# Define table names
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"

# Write to tables in Unity Catalog
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")

Pre-elaborare i dati

# Import required libraries
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble

import matplotlib.pyplot as plt

import optuna
from mlflow.optuna.storage import MlflowStorage
from mlflow.pyspark.optuna.study import MlflowSparkStudy
# Load data from Unity Catalog as Pandas dataframes
white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()

# Add Boolean fields for red and white wine
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)

# Define classification labels based on the wine quality
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)

# Split 80/20 train-test
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
  data_df,
  data_labels,
  test_size=0.2,
  random_state=1
)

Parte 1. Eseguire il training di un modello di classificazione

# Enable MLflow autologging for this notebook
mlflow.autolog()

Eseguire quindi il training di un classificatore all'interno del contesto di un'esecuzione MLflow, che registra automaticamente il modello sottoposto a training e molte metriche e parametri associati.

È possibile integrare la registrazione con metriche aggiuntive, ad esempio il punteggio AUC del modello nel set di dati di test.

with mlflow.start_run(run_name='gradient_boost') as run:
    model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)

    # Models, parameters, and training metrics are tracked automatically
    model.fit(X_train, y_train)

    predicted_probs = model.predict_proba(X_test)
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)

    # Save the ROC curve plot to a file
    roc_curve.figure_.savefig("roc_curve.png")

    # The AUC score on test data is not automatically logged, so log it manually
    mlflow.log_metric("test_auc", roc_auc)

    # Log the ROC curve image file as an artifact
    mlflow.log_artifact("roc_curve.png")

    print("Test AUC of: {}".format(roc_auc))

Visualizzare le esecuzioni di MLflow

Per visualizzare l'esecuzione di training registrata, fare clic sull'icona Esperimentoicona Esperimento in alto a destra del notebook per visualizzare la barra laterale dell'esperimento. Se necessario, fare clic sull'icona di aggiornamento per recuperare e monitorare le esecuzioni più recenti.

Esperimenti elencati nella barra laterale destra

Per visualizzare la pagina dell'esperimento MLflow più dettagliata, fare clic sull'icona della pagina dell'esperimento. Questa pagina consente di confrontare le esecuzioni e visualizzare i dettagli per esecuzioni specifiche. Consultare Tenere traccia dello sviluppo di modelli con MLflow.

Caricare i modelli

È anche possibile accedere ai risultati per un'esecuzione specifica usando l'API MLflow. Il codice nella cella seguente illustra come caricare il modello sottoposto a training in una determinata esecuzione di MLflow e usarlo per eseguire stime. È anche possibile trovare frammenti di codice per il caricamento di modelli specifici nella pagina di esecuzione di MLflow.

# After a model has been logged, you can load it in different notebooks or jobs
# mlflow.pyfunc.load_model makes model prediction available under a common API
model_loaded = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=run.info.run_id
  )
)

predictions_loaded = model_loaded.predict(X_test)
predictions_original = model.predict(X_test)

# The loaded model should match the original
assert(np.array_equal(predictions_loaded, predictions_original))

Parte 2. Ottimizzazione degli iperparametri

A questo punto, hai addestrato un modello semplice e hai utilizzato il servizio di monitoraggio MLflow per organizzare il tuo lavoro. Successivamente, è possibile eseguire un'ottimizzazione più sofisticata usando Optuna.

Training parallelo con Optuna

Optuna è una libreria di Python open source per l'ottimizzazione degli iperparametri che può essere ridimensionata orizzontalmente tra più risorse di calcolo. Per altre informazioni sull'uso di Optuna in Databricks, vedere Ottimizzazione degli iperparametri con Optuna.

def objective(trial):
  # Enable autologging on each worker
  mlflow.autolog()
  with mlflow.start_run(nested=True):
    params = {
      'n_estimators': trial.suggest_int('n_estimators', 20, 1000),
      'learning_rate': trial.suggest_float('learning_rate', 0.05, 1.0, log=True),
      'max_depth': trial.suggest_int('max_depth', 2, 5),
    }
    model_hp = sklearn.ensemble.GradientBoostingClassifier(
      random_state=0,
      **params
    )
    model_hp.fit(X_train, y_train)
    predicted_probs = model_hp.predict_proba(X_test)
    # Tune based on the test AUC
    # In production, you could use a separate validation set instead
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    mlflow.log_metric('test_auc', roc_auc)

    # Negate the AUC because Optuna minimizes the objective by default
    return -roc_auc


with mlflow.start_run(run_name='gb_optuna') as run:
  # Use the MLflow Tracking Server as the Optuna storage backend
  experiment_id = mlflow.active_run().info.experiment_id
  mlflow_storage = MlflowStorage(experiment_id=experiment_id)

  # MlflowSparkStudy distributes the tuning using Spark workers
  mlflow_study = MlflowSparkStudy(
    study_name="gb-optuna-tuning",
    storage=mlflow_storage,
  )

  mlflow_study.optimize(objective, n_trials=32, n_jobs=4)

Esecuzioni di ricerca per recuperare il modello migliore

Poiché tutte le esecuzioni vengono rilevate da MLflow, è possibile recuperare le metriche e i parametri per l'esecuzione ottimale usando l'API delle esecuzioni di ricerca MLflow per trovare l'esecuzione dell'ottimizzazione con l'auc di test più alto.

Questo modello ottimizzato dovrebbe offrire prestazioni migliori rispetto ai modelli più semplici sottoposti a training nella parte 1.

# Sort runs by their test auc. In case of ties, use the most recent run.
best_run = mlflow.search_runs(
  order_by=['metrics.test_auc DESC', 'start_time DESC'],
  max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))

best_model_pyfunc = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )
)

# Make a dataset with all predictions
best_model_predictions = X_test
best_model_predictions["prediction"] = best_model_pyfunc.predict(X_test)

Parte 3. Salvare i risultati e i modelli in Unity Catalog

predictions_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions"
spark.sql(f"DROP TABLE IF EXISTS {predictions_table}")

results = spark.createDataFrame(best_model_predictions)

# Write results back to Unity Catalog from Python
results.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions")
model_uri = 'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )

mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model")

Parte 4. Distribuire il modello

Dopo aver salvato il modello in Unity Catalog, è possibile distribuirlo usando l'interfaccia utente di gestione. Le istruzioni seguenti forniscono una breve descrizione. Per altre informazioni, vedere Creare endpoint di gestione di modelli personalizzati.

  1. Fare clic su Serve nella barra laterale per visualizzare l'interfaccia utente Di servizio.

Interfaccia utente per la gestione dei modelli

  1. Fare click su Crea l'endpoint di servizio.

  2. Nel campo Nome specificare un nome per l'endpoint.

  3. Nella sezione Entità servite

    1. Fare clic nel campo entità per aprire il modulo Seleziona entità servita .
    2. Selezionare My models - Unity Catalog (Modelli personali- Catalogo Unity). Il modulo viene aggiornato dinamicamente in base alla selezione.
    3. Selezionare il wine_quality_model e la versione del modello che si desidera utilizzare.
    4. Selezionare 100 come percentuale di traffico che si vuole instradare al modello servito.
    5. Selezionare CPU come tipo di calcolo per questo esempio.
    6. In Scalabilità di calcolo orizzontale, selezionare Small come dimensione della scalabilità di calcolo orizzontale.
  4. Fare clic su Crea. La pagina Gestione degli endpoint viene visualizzata con Lo stato dell'endpoint di servizio visualizzato come Non pronto.

  5. Quando l'endpoint è Pronto, selezionare Usa per inviare una richiesta di inferenza all'endpoint.

Notebook di esempio

Introduzione: Creare il primo modello di Machine Learning in Databricks

Prendi il notebook