Introdução: Criar seu primeiro modelo de machine learning no Databricks

Este notebook de exemplo ilustra como treinar um modelo de classificação de machine learning no Databricks. O Databricks Runtime para Machine Learning vem com muitas bibliotecas pré-instaladas, incluindo scikit-learn para algoritmos de treinamento e pré-processamento, MLflow para acompanhar o processo de desenvolvimento de modelos e Hyperopt com SparkTrials para dimensionar o ajuste do hiperparâmetro.

Neste notebook, você cria um modelo de classificação para prever se um vinho é considerado de "alta qualidade". O conjunto de dados consiste em 11 características de diferentes vinhos (por exemplo, teor alcoólico, acidez e açúcar residual) e uma classificação de qualidade entre 1 e 10.

Este tutorial abrange:

  • Parte 1: Treinar um modelo de classificação com o acompanhamento do MLflow
  • Parte 2: Ajuste de hiperparâmetro para melhorar o desempenho do modelo
  • Parte 3: Salvar resultados e modelos no Catálogo do Unity
  • Parte 4: Implantar o modelo

Para obter mais detalhes sobre como produzir machine learning no Databricks, incluindo o gerenciamento do ciclo de vida do modelo e a inferência de modelo, consulte o exemplo de ponta a ponta do ML.

O conjunto de dados está disponível no Repositório de Aprendizado de Máquina da UCI e é apresentado em Modelando preferências de vinho por mineração de dados a partir de propriedades físico-químicas [Cortez et al., 2009].

Requirements

Configuração

Nesta seção, você fará o seguinte:

  • Configure o cliente MLflow para usar o Catálogo do Unity como o registro de modelo.
  • Defina o catálogo e o esquema em que o modelo será registrado.
  • Leia os dados e salve-os em tabelas no Catálogo do Unity.
  • Pré-processar os dados.

Configurar o cliente MLflow

Por padrão, o cliente Python do MLflow cria modelos no registro de modelos do workspace do Databricks. Para salvar modelos no Catálogo do Unity, configure o cliente MLflow, conforme mostrado na célula a seguir.

import mlflow
mlflow.set_registry_uri("databricks-uc")

A célula a seguir define o catálogo e o esquema em que o modelo será registrado. Você deve ter o privilégio USE CATALOG no catálogo e os privilégios USE_SCHEMA, CREATE_TABLE e CREATE_MODEL no esquema. Altere os nomes de catálogo e esquema na célula a seguir, se necessário.

Para obter mais informações, consulte a documentação do Catálogo do Unity.

# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA, CREATE_TABLE, and CREATE_MODEL privileges on the schema.
# Change the catalog and schema here if necessary.
CATALOG_NAME = "main"
SCHEMA_NAME = "default"

Carregar dados e salvá-los em tabelas no Unity Catalog

O conjunto de dados está disponível em databricks-datasets. Na célula a seguir, você lê os dados dos .csv arquivos no Spark DataFrames. Em seguida, você grava os DataFrames em tabelas no Catálogo do Unity. Isso persiste os dados e permite que você controle como compartilhá-los com outras pessoas.

white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)

# Remove the spaces from the column names
for c in white_wine.columns:
    white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
    red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))

# Define table names
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"

# Write to tables in Unity Catalog
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")

Pré-processar dados

# Import required libraries
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble

import matplotlib.pyplot as plt

from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
# Load data from Unity Catalog as Pandas dataframes
white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()

# Add Boolean fields for red and white wine
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)

# Define classification labels based on the wine quality
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)

# Split 80/20 train-test
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
  data_df,
  data_labels,
  test_size=0.2,
  random_state=1
)

Parte 1. Treinar um modelo de classificação

# Enable MLflow autologging for this notebook
mlflow.autolog()

Em seguida, treine um classificador no contexto de uma execução do MLflow, que registra automaticamente o modelo treinado e muitas métricas e parâmetros associados.

Você pode complementar o registro em log com métricas adicionais, como a pontuação do AUC do modelo no conjunto de dados de teste.

with mlflow.start_run(run_name='gradient_boost') as run:
    model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)

    # Models, parameters, and training metrics are tracked automatically
    model.fit(X_train, y_train)

    predicted_probs = model.predict_proba(X_test)
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)

    # Save the ROC curve plot to a file
    roc_curve.figure_.savefig("roc_curve.png")

    # The AUC score on test data is not automatically logged, so log it manually
    mlflow.log_metric("test_auc", roc_auc)

    # Log the ROC curve image file as an artifact
    mlflow.log_artifact("roc_curve.png")

    print("Test AUC of: {}".format(roc_auc))

Exibir execuções do MLflow

Para ver o treinamento registrado em log, clique no ícone Experimentoícone Experimento no canto superior direito do notebook para exibir a barra lateral do experimento. Se necessário, clique no ícone de atualização para buscar e monitorar as execuções mais recentes.

Experimentos listados na barra lateral direita

Para exibir a página de experimentos do MLflow mais detalhada, clique no ícone da página de experimentos. Esta página permite comparar execuções e exibir detalhes de execuções específicas. Consulte Rastrear o desenvolvimento de modelos usando o MLflow.

Modelos de carga

Você também pode acessar os resultados de uma execução específica usando a API do MLflow. O código na célula a seguir ilustra como carregar o modelo treinado em uma determinada execução do MLflow e usá-lo para fazer previsões. Você também pode encontrar snippets de código para carregar modelos específicos na página de execução do MLflow.

# After a model has been logged, you can load it in different notebooks or jobs
# mlflow.pyfunc.load_model makes model prediction available under a common API
model_loaded = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=run.info.run_id
  )
)

predictions_loaded = model_loaded.predict(X_test)
predictions_original = model.predict(X_test)

# The loaded model should match the original
assert(np.array_equal(predictions_loaded, predictions_original))

Parte 2. Ajuste de hiperparâmetros

Neste ponto, você treinou um modelo simples e usou o serviço de acompanhamento do MLflow para organizar seu trabalho. Em seguida, você pode executar um ajuste mais sofisticado usando o Hyperopt.

Treinamento paralelo com Hyperopt e SparkTrials

Hyperopt é uma biblioteca Python para ajuste de hiperparâmetro. Para obter mais informações sobre como usar o Hyperopt no Databricks, consulte Usar algoritmos de treinamento distribuídos com o Hyperopt.

Você pode usar o Hyperopt com SparkTrials para executar varreduras de hiperparâmetro e treinar vários modelos em paralelo. Isso reduz o tempo necessário para otimizar o desempenho do modelo. O acompanhamento do MLflow é integrado ao Hyperopt para registrar em log automaticamente modelos e parâmetros.

# Define the search space to explore
search_space = {
  'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
  'learning_rate': hp.loguniform('learning_rate', -3, 0),
  'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
}

def train_model(params):
  # Enable autologging on each worker
  mlflow.autolog()
  with mlflow.start_run(nested=True):
    model_hp = sklearn.ensemble.GradientBoostingClassifier(
      random_state=0,
      **params
    )
    model_hp.fit(X_train, y_train)
    predicted_probs = model_hp.predict_proba(X_test)
    # Tune based on the test AUC
    # In production, you could use a separate validation set instead
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    mlflow.log_metric('test_auc', roc_auc)

    # Set the loss to -1*auc_score so fmin maximizes the auc_score
    return {'status': STATUS_OK, 'loss': -1*roc_auc}

# SparkTrials distributes the tuning using Spark workers
# Greater parallelism speeds processing, but each hyperparameter trial has less information from other trials
# On smaller clusters try setting parallelism=2
spark_trials = SparkTrials(
  parallelism=1
)

with mlflow.start_run(run_name='gb_hyperopt') as run:
  # Use hyperopt to find the parameters yielding the highest AUC
  best_params = fmin(
    fn=train_model,
    space=search_space,
    algo=tpe.suggest,
    max_evals=32,
    trials=spark_trials)

A pesquisa é executada para recuperar o melhor modelo

Como todas as execuções são controladas pelo MLflow, você pode recuperar as métricas e os parâmetros para a melhor execução usando a API de execuções de pesquisa do MLflow para localizar a execução de ajuste com a auc de teste mais alta.

Esse modelo ajustado deve ter um desempenho melhor do que os modelos mais simples treinados na Parte 1.

# Sort runs by their test auc. In case of ties, use the most recent run.
best_run = mlflow.search_runs(
  order_by=['metrics.test_auc DESC', 'start_time DESC'],
  max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))

best_model_pyfunc = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )
)

# Make a dataset with all predictions
best_model_predictions = X_test
best_model_predictions["prediction"] = best_model_pyfunc.predict(X_test)

Parte 3. Salvar resultados e modelos no Catálogo do Unity

predictions_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions"
spark.sql(f"DROP TABLE IF EXISTS {predictions_table}")

results = spark.createDataFrame(best_model_predictions)

# Write results back to Unity Catalog from Python
results.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions")
model_uri = 'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )

mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model")

Parte 4. Implantar modelo

Depois de salvar seu modelo no Catálogo do Unity, você pode implantá-lo usando a interface do usuário de serviço. As instruções a seguir fornecem uma breve descrição. Para obter mais informações, consulte Criar modelo personalizado que serve pontos de extremidade.

  1. Clique em Servindo na barra lateral para exibir a interface do usuário de serviço.

Modelo que atende a interface do usuário

  1. Clique em Criar endpoint de serviço.

  2. No campo Nome, forneça um nome para o ponto de extremidade.

  3. Na seção Entidades atendidas

    1. Clique no campo Entidade para abrir o formulário Selecionar entidade atendida.
    2. Selecione Meus modelos– Catálogo do Unity. O formulário é atualizado dinamicamente com base em sua seleção.
    3. Selecione o wine_quality_model e a versão do modelo que você deseja servir.
    4. Selecione 100 como o percentual de tráfego que você deseja rotear para o modelo servido.
    5. Selecione CPU como o tipo de computação para este exemplo.
    6. Em Escala de Computação, selecione Pequeno como o tamanho da escala da computação.
  4. Clique em Criar. A página Endpontos de Serviço é exibida com estado do endponto de serviço mostrado como Não Pronto.

  5. Quando o ponto de extremidade estiver pronto, selecione Usar para enviar uma solicitação de inferência para o ponto de extremidade.

Caderno de exemplo

Introdução: Criar seu primeiro modelo de machine learning no Databricks

Obter laptop