Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
I den här artikeln får du lära dig hur du använder Apache Spark MLlib för att skapa ett maskininlärningsprogram som hanterar förutsägelseanalys på en Azure öppen datauppsättning. Spark tillhandahåller inbyggda maskininlärningsbibliotek. I det här exemplet används klassificering via logistisk regression.
Den här handledningen täcker följande steg:
- Konfigurera notebook och importer
- Läs in och sampla taxidata från NYC
- Förbereda och skapa funktioner
- Koda kategoriska funktioner
- Träna logistisk regressionsmodell
- Utvärdera och visualisera resultat
De grundläggande SparkML- och MLlib Spark-biblioteken tillhandahåller många verktyg som är användbara för maskininlärningsuppgifter. Dessa verktyg är lämpliga för:
- Klassificering
- Klustring
- Hypotestestning och beräkning av exempelstatistik
- Regression
- Singulär värde nedbrytning (SVD) och huvudkomponentanalys (PCA)
- Ämnesmodellering
Förutsättningar
Skaffa en Microsoft Fabric-prenumeration. Eller registrera dig för en kostnadsfri utvärderingsversion av Microsoft Fabric.
Logga in på Microsoft Fabric.
Växla till Fabric med hjälp av upplevelseväxlaren längst ned till vänster på startsidan.
- Skapa vid behov ett Microsoft Fabric sjöhus enligt beskrivningen i Skapa ett sjöhus i Microsoft Fabric.
- Skapa en ny anteckningsbok i din arbetsyta genom att välja + och sedan Notebook. Mer information finns i Skapa en notebook-fil.
Förstå klassificering och logistisk regression
Klassificering, en populär maskininlärningsuppgift, omfattar sortering av indata i kategorier. En klassificeringsalgoritm beskriver hur du tilldelar etiketter till angivna indata. Till exempel kan en maskininlärningsalgoritm acceptera aktieinformation som indata och dela in aktien i två kategorier: aktier som du bör sälja och aktier som du bör behålla.
Algoritmen för logistisk regression är användbar för klassificering. Api:et för logistisk regression i Spark är användbart för binär klassificering av indata i en av två grupper. Mer information om logistisk regression finns i Wikipedia.
Logistisk regression skapar en logistisk funktion som förutsäger sannolikheten för att en indatavektor tillhör den ena gruppen eller den andra.
Exempel på förutsägelseanalys av NYC-taxidata
Data är tillgängliga via Azure Open Datasets-resursen . Den här delmängden innehåller information om gula taxiresor, inklusive starttider, sluttider, startplatser, slutplatser, resekostnader och andra attribut.
I den här självstudien används Apache Spark för att analysera dricksdata för taxiresor i New York och utveckla en modell för att förutsäga om en viss resa inkluderar dricks.
Skapa en Apache Spark-maskininlärningsmodell
Skapa en PySpark-anteckningsbok. Mer information finns i Skapa en notebook-fil.
När du har skapat anteckningsboken ansluter du den till ett lakehouse genom att välja Lägg till lakehouse i panelen till vänster.
Importera de typer som krävs för den här notebooken. Klistra in följande kod i den första cellen och kör den.
import matplotlib.pyplot as plt from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorVerifiera: Cellen slutförs utan
ImportError. Om du ser ett fel, bekräfta att din notebook använder PySpark-körmiljön.Använd MLflow för att spåra dina maskininlärningsexperiment och motsvarande körningar. Om Microsoft Fabric Autologging är aktiverat registreras motsvarande mått och parametrar automatiskt.
import mlflowKontrollera: Cellen slutförs utan fel. Kör
print(mlflow.__version__)för att bekräfta att MLflow är tillgängligt.
Konstruera indataramen
Det här exemplet läser in data från Azure Open Datasets lagring till en Apache Spark DataFrame. Sedan använder du Spark-åtgärder för att rensa och filtrera datamängden.
Klistra in följande kod i en ny cell och kör den för att skapa en Spark DataFrame. Det här steget hämtar NYC:s gula taxidata som har filtrerats för maj 2018.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \ .repartition(20)Verifiera: Kör följande cell för att kontrollera att data läses in korrekt.
print(f"Loaded {nyc_tlc_df.count()} rows") # Expected output: Loaded approximately 9,000,000+ rowsExempel på datauppsättningen för att påskynda utveckling och träning.
# Sample without replacement to avoid duplicates sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)Kontrollera: Bekräfta att exempelstorleken är hanterbar.
print(f"Sampled {sampled_taxi_df.count()} rows") # Expected output: Sampled approximately 9,000-10,000 rowsVisa data med hjälp av det inbyggda
display()kommandot för att utforska dataexemplet.display(sampled_taxi_df.limit(10))Kontrollera: En tabell med 10 rader visas som visar kolumner som
tpepPickupDateTime,fareAmount,tipAmountochtripDistance.
Förbereda data
Dataförberedelse är ett viktigt steg i maskininlärningsprocessen. Det handlar om att rensa, transformera och organisera rådata för att göra dem lämpliga för analys och modellering. I det här avsnittet utför du flera dataförberedelsesteg:
- Filtrera datamängden för att ta bort avvikande värden och felaktiga värden.
- Ta bort kolumner som inte behövs för modellträning.
- Skapa nya kolumner från rådata.
- Generera en etikett för att avgöra om en viss taxiresa innehåller ett tips.
Kör följande kod för att välja relevanta kolumner, beräkningsbaserade funktioner och filtrera extremvärden:
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
(when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
) \
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Viktigt!
Funktionen date_format använder mönstret 'HH' (24-timmarsformat, värden 0–23) i stället 'hh' för (12-timmarsformat, värden 1–12). 24-timmarsformatet krävs för den tidsbaserade binningslogik som följer.
Lägg sedan till funktionen för intervall för trafiktid baserat på timmen på dagen:
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
.when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
.when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
.when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
.otherwise("Other").alias('trafficTimeBins')
) \
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Verifiera: Bekräfta att tidsintervallen för trafiken har fördelats korrekt.
taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories
Skapa en logistisk regressionsmodell
Den sista uppgiften konverterar etiketterade data till ett format som logistisk regression kan hantera. Indata till en logistisk regressionsalgoritm måste ha en struktur för etikett-/funktionsvektorpar, där funktionsvektorn är en vektor med tal som representerar indatapunkten.
Konvertera de kategoriska kolumnerna trafficTimeBins och weekdayString till heltalsrepresentationer med hjälp OneHotEncoder av metoden:
# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Verifiera: Bekräfta att den kodade dataramen har de förväntade nya kolumnerna.
print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'
Träna en logistisk regressionsmodell
Dela upp datamängden i en träningsuppsättning (70%) och en testuppsättning (30%):
# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Kontrollera: Bekräfta att uppdelningen gav rimliga storlekar.
print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data
Skapa modellformeln, träna den logistiska regressionsmodellen och utvärdera den med hjälp av arean under ROC-kurvan (mottagaroperatorns karakteristikkurva):
# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')
# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")
# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)
# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")
Kontrollera: Utdata visar ett AUC-värde. En högpresterande modell genererar ett värde nära 1,0.
Area under ROC = 0.97 (approximately)
Note
Det exakta AUC-värdet varierar beroende på dataexemplet. Värden över 0,90 indikerar stark förutsägelseprestanda för den här datamängden.
Skapa en visuell representation av förutsägelsen
Skapa en slutlig visualisering för att tolka modellresultatet. En ROC-kurva visar kompromissen mellan sann positiv ränta och falsk positiv ränta.
# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary
# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()
Kontrollera: En ritning visas som visar ROC-kurvan ovanför den röda streckade diagonallinjen. Kurvan bör böjas mot det övre vänstra hörnet, vilket indikerar stark klassificeringsprestanda.
Rensa resurser
När du har slutfört den här självstudien tar du bort anteckningsboken och lakehouset för att frigöra arbetsytans kapacitet:
- Högerklicka på anteckningsboken på arbetsytan och välj Ta bort.
- Om du har skapat ett lakehouse specifikt för den här självstudien högerklickar du på det och väljer Ta bort.
Om du vill bevara den tränade modellen för framtida användning lägger du till följande kod före rensningen:
# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")
Felsökning
| Problematik | Orsak | Lösning |
|---|---|---|
Py4JJavaError vid läsning av Parquet |
Nätverksanslutning till Azure bloblagring | Kontrollera att din Fabric arbetsyta har utgående Internetåtkomst. Prova att starta om Spark-sessionen. |
AnalysisException: cannot resolve column |
Stavfel i kolumnnamn eller att schemat inte matchar | Kör nyc_tlc_df.printSchema() för att granska tillgängliga kolumner. Schemat för NYC-taxidatamängden kan ändras mellan år. |
| Tom dataram efter filtrering | Filtervillkor är för restriktiva för datafönstret | Öka datumintervallet eller kontrollera sampled_taxi_df.count() innan du filtrerar. |
IllegalArgumentException i StringIndexer |
Osedda etiketter under transformering | Lägg till handleInvalid="skip" i dina StringIndexer samtal: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip") |
| Låg AUC (under 0,6) | Otillräckliga data eller felaktig funktionsutveckling | Öka exempelfraktionen (till exempel 0.01 i stället för 0.001) och kontrollera att trafficTimeBins kategorierna är balanserade. |
OutOfMemoryError |
Datauppsättningen är för stor för tillgänglig kapacitet | Minska samplingsandelen eller höj din Fabric-kapacitetsnivå. |
| ROC-diagrammet visas inte | Problem med Matplotlib-backend i notebook | Lägg till %matplotlib inline överst i anteckningsboken. |
Relaterat innehåll
- Använda AI-exempel för att skapa maskininlärningsmodeller: Använda AI-exempel
- Spåra maskininlärningskörningar med experiment: Maskininlärningsexperiment