はじめに: Databricks で初めての機械学習モデルを構築する

このノートブック例では、Databricks で機械学習分類モデルをトレーニングする方法を示します。 Databricks Runtime for Machine Learningには、トレーニングアルゴリズムと前処理アルゴリズム用の scikit-learn、モデル開発プロセスを追跡するための MLflow、ハイパーパラメーター調整をスケーリングするための Optuna など、多くのライブラリがプレインストールされています。

このノートブックでは、ワインが "高品質" と見なされるかどうかを予測する分類モデルを作成します。 データセットは、さまざまなワインの 11 種類の特徴 (アルコール含有量、酸度、残留糖など) と、1 から 10 の品質ランク付けで構成されます。

このチュートリアルの内容:

  • パート 1: MLflow 追跡を使用して分類モデルをトレーニングする
  • パート 2: モデルのパフォーマンスを向上させるためのハイパーパラメーターのチューニング
  • パート 3: 結果とモデルを Unity カタログに保存する
  • パート 4: モデルをデプロイする

モデル ライフサイクル管理やモデル推論など、Databricks での機械学習の運用の詳細については、 ML のエンド ツー エンドの例を参照してください。

データセットは、UCI Machine Learning リポジトリ から入手でき、物理化学的プロパティを用いたデータマイニングによってワインの好みのモデル化 [Cortez et al., 2009]で提供されます。

Requirements

セットアップ

このセクションでは、次の操作を行います。

  • モデル レジストリとして Unity カタログを使用するように MLflow クライアントを構成します。
  • モデルを登録するカタログとスキーマを設定します。
  • データを読み取り、Unity カタログのテーブルに保存します。
  • データを前処理します。

MLflow クライアントの構成

既定では、MLflow Python クライアントは Databricks ワークスペース モデル レジストリにモデルを作成します。 Unity カタログにモデルを保存するには、次のセルに示すように MLflow クライアントを構成します。

import mlflow
mlflow.set_registry_uri("databricks-uc")

次のセルは、モデルが登録されるカタログとスキーマを設定します。 カタログに対する USE CATALOG 権限と、スキーマに対するUSE_SCHEMA、CREATE_TABLE、およびCREATE_MODEL権限が必要です。 必要に応じて、次のセルのカタログ名とスキーマ名を変更します。

詳細については、 Unity カタログのドキュメントを参照してください

# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA, CREATE_TABLE, and CREATE_MODEL privileges on the schema.
# Change the catalog and schema here if necessary.
CATALOG_NAME = "main"
SCHEMA_NAME = "default"

データを読み取り、Unity カタログのテーブルに保存する

データセットは、 databricks-datasetsで使用できます。 次のセルでは、 .csv ファイルから Spark DataFrames にデータを読み込みます。 その後、DataFrames を Unity カタログのテーブルに書き込みます。 両方ともデータを保持し、他のユーザーと共有する方法を制御できます。

white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)

# Remove the spaces from the column names
for c in white_wine.columns:
    white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
    red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))

# Define table names
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"

# Write to tables in Unity Catalog
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")

データの前処理

# Import required libraries
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble

import matplotlib.pyplot as plt

import optuna
from mlflow.optuna.storage import MlflowStorage
from mlflow.pyspark.optuna.study import MlflowSparkStudy
# Load data from Unity Catalog as Pandas dataframes
white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()

# Add Boolean fields for red and white wine
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)

# Define classification labels based on the wine quality
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)

# Split 80/20 train-test
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
  data_df,
  data_labels,
  test_size=0.2,
  random_state=1
)

第 1 部 分類モデルをトレーニングする

# Enable MLflow autologging for this notebook
mlflow.autolog()

次に、MLflow 実行のコンテキスト内で分類子をトレーニングします。これによって、トレーニング済みのモデルと多数の関連するメトリックとパラメーターが自動的にログに記録されます。

テスト データセットのモデルの AUC スコアなどの追加のメトリックでログ記録を補足できます。

with mlflow.start_run(run_name='gradient_boost') as run:
    model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)

    # Models, parameters, and training metrics are tracked automatically
    model.fit(X_train, y_train)

    predicted_probs = model.predict_proba(X_test)
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)

    # Save the ROC curve plot to a file
    roc_curve.figure_.savefig("roc_curve.png")

    # The AUC score on test data is not automatically logged, so log it manually
    mlflow.log_metric("test_auc", roc_auc)

    # Log the ROC curve image file as an artifact
    mlflow.log_artifact("roc_curve.png")

    print("Test AUC of: {}".format(roc_auc))

MLflow の実行を表示する

ログに記録されたトレーニングの実行を表示するには、ノートブックの右上にある 実験 アイコン [ 実験] アイコン をクリックして、実験のサイドバーを表示します。 必要に応じて、更新アイコンをクリックして、最新の実行を取得して監視します。

右側のサイドバーに一覧表示されている実験

より詳細な MLflow 実験ページを表示するには、実験ページ アイコンをクリックします。 このページでは、実行を比較し、特定の実行の詳細を表示できます。 「MLflow を使用してモデル開発を追跡する」を参照してください。

モデルの読み込み

MLflow API を使用して、特定の実行の結果にアクセスすることもできます。 次のセルのコードは、特定の MLflow 実行でトレーニングされたモデルを読み込み、それを使用して予測を行う方法を示しています。 MLflow 実行ページで特定のモデルを読み込むためのコード スニペットを見つけることもできます。

# After a model has been logged, you can load it in different notebooks or jobs
# mlflow.pyfunc.load_model makes model prediction available under a common API
model_loaded = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=run.info.run_id
  )
)

predictions_loaded = model_loaded.predict(X_test)
predictions_original = model.predict(X_test)

# The loaded model should match the original
assert(np.array_equal(predictions_loaded, predictions_original))

第 2 部 ハイパーパラメーターの調整

この時点で、単純なモデルをトレーニングし、MLflow 追跡サービスを使用して作業を整理しました。 次に、Optuna を使用して、より高度なチューニングを実行できます。

Optuna を使用した並列トレーニング

Optuna は、ハイパーパラメーター調整用のオープンソース Python ライブラリであり、複数のコンピューティング リソース間で水平方向にスケーリングできます。 Databricks で Optuna を使用する方法の詳細については、「 Optuna を使用したハイパーパラメーターのチューニング」を参照してください。

def objective(trial):
  # Enable autologging on each worker
  mlflow.autolog()
  with mlflow.start_run(nested=True):
    params = {
      'n_estimators': trial.suggest_int('n_estimators', 20, 1000),
      'learning_rate': trial.suggest_float('learning_rate', 0.05, 1.0, log=True),
      'max_depth': trial.suggest_int('max_depth', 2, 5),
    }
    model_hp = sklearn.ensemble.GradientBoostingClassifier(
      random_state=0,
      **params
    )
    model_hp.fit(X_train, y_train)
    predicted_probs = model_hp.predict_proba(X_test)
    # Tune based on the test AUC
    # In production, you could use a separate validation set instead
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    mlflow.log_metric('test_auc', roc_auc)

    # Negate the AUC because Optuna minimizes the objective by default
    return -roc_auc


with mlflow.start_run(run_name='gb_optuna') as run:
  # Use the MLflow Tracking Server as the Optuna storage backend
  experiment_id = mlflow.active_run().info.experiment_id
  mlflow_storage = MlflowStorage(experiment_id=experiment_id)

  # MlflowSparkStudy distributes the tuning using Spark workers
  mlflow_study = MlflowSparkStudy(
    study_name="gb-optuna-tuning",
    storage=mlflow_storage,
  )

  mlflow_study.optimize(objective, n_trials=32, n_jobs=4)

最適なモデルを取得するための検索実行

すべての実行は MLflow によって追跡されるため、MLflow 検索実行 API を使用して最適な実行のメトリックとパラメーターを取得し、テスト auc が最も高いチューニング実行を見つけることができます。

このチューニングされたモデルは、パート 1 でトレーニングされたより単純なモデルよりも優れたパフォーマンスを発揮する必要があります。

# Sort runs by their test auc. In case of ties, use the most recent run.
best_run = mlflow.search_runs(
  order_by=['metrics.test_auc DESC', 'start_time DESC'],
  max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))

best_model_pyfunc = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )
)

# Make a dataset with all predictions
best_model_predictions = X_test
best_model_predictions["prediction"] = best_model_pyfunc.predict(X_test)

パート 3. 結果とモデルを Unity カタログに保存する

predictions_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions"
spark.sql(f"DROP TABLE IF EXISTS {predictions_table}")

results = spark.createDataFrame(best_model_predictions)

# Write results back to Unity Catalog from Python
results.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions")
model_uri = 'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )

mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model")

パート 4. モデルのデプロイ

モデルを Unity カタログに保存した後、サービス UI を使用してモデルをデプロイできます。 次の手順では、簡単な説明を示します。 詳細については、「 カスタム モデル サービス エンドポイントの作成」を参照してください。

  1. サイドバーで [Serving]\( サービス \) をクリックして、サービス UI を表示します。

モデル提供 UI

  1. [ サービス エンドポイントの作成] をクリックします。

  2. [名前] フィールドに、エンドポイントの名前を指定します。

  3. [Served entities]\(提供されるエンティティ\) セクションで、次のようにします。

    1. [エンティティ] フィールドをクリックして、[Select served entity] (提供されるエンティティの選択) フォームを開きます。
    2. [ マイ モデル- Unity カタログ] を選択します。 フォームは、選択内容に基づいて動的に更新されます。
    3. wine_quality_model とモデルの選択し、提供するバージョンを指定します。
    4. サービス済みモデルにルーティングするトラフィックの割合として 100 を選択します。
    5. この例のコンピューティングの種類として [CPU ] を選択します。
    6. [ コンピューティング スケールアウト] で、コンピューティング スケールアウト サイズとして [小] を選択します。
  4. Create をクリックしてください。 [ サービス エンドポイント] ページが 表示され、 サービス エンドポイントの状態[準備ができていません] と表示されます。

  5. エンドポイントの 準備ができたら、[ 使用 ] を選択して、エンドポイントに推論要求を送信します。

ノートブックの例

はじめに: Databricks で初めての機械学習モデルを構築する

ノートブックを入手