Skapa en maskininlärningsmodell med Apache Spark MLlib

I den här artikeln får du lära dig hur du använder Apache Spark MLlib för att skapa ett maskininlärningsprogram som hanterar förutsägelseanalys på en Azure öppen datauppsättning. Spark tillhandahåller inbyggda maskininlärningsbibliotek. I det här exemplet används klassificering via logistisk regression.

Den här handledningen täcker följande steg:

  • Konfigurera notebook och importer
  • Läs in och sampla taxidata från NYC
  • Förbereda och skapa funktioner
  • Koda kategoriska funktioner
  • Träna logistisk regressionsmodell
  • Utvärdera och visualisera resultat

De grundläggande SparkML- och MLlib Spark-biblioteken tillhandahåller många verktyg som är användbara för maskininlärningsuppgifter. Dessa verktyg är lämpliga för:

  • Klassificering
  • Klustring
  • Hypotestestning och beräkning av exempelstatistik
  • Regression
  • Singulär värde nedbrytning (SVD) och huvudkomponentanalys (PCA)
  • Ämnesmodellering

Förutsättningar

Förstå klassificering och logistisk regression

Klassificering, en populär maskininlärningsuppgift, omfattar sortering av indata i kategorier. En klassificeringsalgoritm beskriver hur du tilldelar etiketter till angivna indata. Till exempel kan en maskininlärningsalgoritm acceptera aktieinformation som indata och dela in aktien i två kategorier: aktier som du bör sälja och aktier som du bör behålla.

Algoritmen för logistisk regression är användbar för klassificering. Api:et för logistisk regression i Spark är användbart för binär klassificering av indata i en av två grupper. Mer information om logistisk regression finns i Wikipedia.

Logistisk regression skapar en logistisk funktion som förutsäger sannolikheten för att en indatavektor tillhör den ena gruppen eller den andra.

Exempel på förutsägelseanalys av NYC-taxidata

Data är tillgängliga via Azure Open Datasets-resursen . Den här delmängden innehåller information om gula taxiresor, inklusive starttider, sluttider, startplatser, slutplatser, resekostnader och andra attribut.

I den här självstudien används Apache Spark för att analysera dricksdata för taxiresor i New York och utveckla en modell för att förutsäga om en viss resa inkluderar dricks.

Skapa en Apache Spark-maskininlärningsmodell

  1. Skapa en PySpark-anteckningsbok. Mer information finns i Skapa en notebook-fil.

    När du har skapat anteckningsboken ansluter du den till ett lakehouse genom att välja Lägg till lakehouse i panelen till vänster.

  2. Importera de typer som krävs för den här notebooken. Klistra in följande kod i den första cellen och kör den.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Verifiera: Cellen slutförs utan ImportError. Om du ser ett fel, bekräfta att din notebook använder PySpark-körmiljön.

  3. Använd MLflow för att spåra dina maskininlärningsexperiment och motsvarande körningar. Om Microsoft Fabric Autologging är aktiverat registreras motsvarande mått och parametrar automatiskt.

    import mlflow
    

    Kontrollera: Cellen slutförs utan fel. Kör print(mlflow.__version__) för att bekräfta att MLflow är tillgängligt.

Konstruera indataramen

Det här exemplet läser in data från Azure Open Datasets lagring till en Apache Spark DataFrame. Sedan använder du Spark-åtgärder för att rensa och filtrera datamängden.

  1. Klistra in följande kod i en ny cell och kör den för att skapa en Spark DataFrame. Det här steget hämtar NYC:s gula taxidata som har filtrerats för maj 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Verifiera: Kör följande cell för att kontrollera att data läses in korrekt.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Exempel på datauppsättningen för att påskynda utveckling och träning.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Kontrollera: Bekräfta att exempelstorleken är hanterbar.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Visa data med hjälp av det inbyggda display() kommandot för att utforska dataexemplet.

    display(sampled_taxi_df.limit(10))
    

    Kontrollera: En tabell med 10 rader visas som visar kolumner som tpepPickupDateTime, fareAmount, tipAmountoch tripDistance.

Förbereda data

Dataförberedelse är ett viktigt steg i maskininlärningsprocessen. Det handlar om att rensa, transformera och organisera rådata för att göra dem lämpliga för analys och modellering. I det här avsnittet utför du flera dataförberedelsesteg:

  • Filtrera datamängden för att ta bort avvikande värden och felaktiga värden.
  • Ta bort kolumner som inte behövs för modellträning.
  • Skapa nya kolumner från rådata.
  • Generera en etikett för att avgöra om en viss taxiresa innehåller ett tips.

Kör följande kod för att välja relevanta kolumner, beräkningsbaserade funktioner och filtrera extremvärden:

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Viktigt!

Funktionen date_format använder mönstret 'HH' (24-timmarsformat, värden 0–23) i stället 'hh' för (12-timmarsformat, värden 1–12). 24-timmarsformatet krävs för den tidsbaserade binningslogik som följer.

Lägg sedan till funktionen för intervall för trafiktid baserat på timmen på dagen:

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Verifiera: Bekräfta att tidsintervallen för trafiken har fördelats korrekt.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Skapa en logistisk regressionsmodell

Den sista uppgiften konverterar etiketterade data till ett format som logistisk regression kan hantera. Indata till en logistisk regressionsalgoritm måste ha en struktur för etikett-/funktionsvektorpar, där funktionsvektorn är en vektor med tal som representerar indatapunkten.

Konvertera de kategoriska kolumnerna trafficTimeBins och weekdayString till heltalsrepresentationer med hjälp OneHotEncoder av metoden:

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Verifiera: Bekräfta att den kodade dataramen har de förväntade nya kolumnerna.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Träna en logistisk regressionsmodell

Dela upp datamängden i en träningsuppsättning (70%) och en testuppsättning (30%):

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Kontrollera: Bekräfta att uppdelningen gav rimliga storlekar.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Skapa modellformeln, träna den logistiska regressionsmodellen och utvärdera den med hjälp av arean under ROC-kurvan (mottagaroperatorns karakteristikkurva):

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Kontrollera: Utdata visar ett AUC-värde. En högpresterande modell genererar ett värde nära 1,0.

Area under ROC = 0.97 (approximately)

Note

Det exakta AUC-värdet varierar beroende på dataexemplet. Värden över 0,90 indikerar stark förutsägelseprestanda för den här datamängden.

Skapa en visuell representation av förutsägelsen

Skapa en slutlig visualisering för att tolka modellresultatet. En ROC-kurva visar kompromissen mellan sann positiv ränta och falsk positiv ränta.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Kontrollera: En ritning visas som visar ROC-kurvan ovanför den röda streckade diagonallinjen. Kurvan bör böjas mot det övre vänstra hörnet, vilket indikerar stark klassificeringsprestanda.

Diagram som visar ROC-kurvan för logistisk regression i tipsmodellen.

Rensa resurser

När du har slutfört den här självstudien tar du bort anteckningsboken och lakehouset för att frigöra arbetsytans kapacitet:

  1. Högerklicka på anteckningsboken på arbetsytan och välj Ta bort.
  2. Om du har skapat ett lakehouse specifikt för den här självstudien högerklickar du på det och väljer Ta bort.

Om du vill bevara den tränade modellen för framtida användning lägger du till följande kod före rensningen:

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Felsökning

Problematik Orsak Lösning
Py4JJavaError vid läsning av Parquet Nätverksanslutning till Azure bloblagring Kontrollera att din Fabric arbetsyta har utgående Internetåtkomst. Prova att starta om Spark-sessionen.
AnalysisException: cannot resolve column Stavfel i kolumnnamn eller att schemat inte matchar Kör nyc_tlc_df.printSchema() för att granska tillgängliga kolumner. Schemat för NYC-taxidatamängden kan ändras mellan år.
Tom dataram efter filtrering Filtervillkor är för restriktiva för datafönstret Öka datumintervallet eller kontrollera sampled_taxi_df.count() innan du filtrerar.
IllegalArgumentException i StringIndexer Osedda etiketter under transformering Lägg till handleInvalid="skip" i dina StringIndexer samtal: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
Låg AUC (under 0,6) Otillräckliga data eller felaktig funktionsutveckling Öka exempelfraktionen (till exempel 0.01 i stället för 0.001) och kontrollera att trafficTimeBins kategorierna är balanserade.
OutOfMemoryError Datauppsättningen är för stor för tillgänglig kapacitet Minska samplingsandelen eller höj din Fabric-kapacitetsnivå.
ROC-diagrammet visas inte Problem med Matplotlib-backend i notebook Lägg till %matplotlib inline överst i anteckningsboken.