Klassificeringsuppgifter med SynapseML

Den här artikeln visar hur du utför en textklassificeringsuppgift med två metoder. Den ena metoden använder oformaterad pysparkoch den andra använder synapseml biblioteket. Båda metoderna ger samma prestanda, men belyser hur SynapseML minskar kodkomplexiteten jämfört med pyspark.

Uppgiften förutsäger om en kundgranskning av en bok som säljs på Amazon är bra (betyg > 3) eller dålig, baserat på recensionstexten. Du tränar LogisticRegression-elever med olika hyperparametrar och väljer sedan den bästa modellen.

Förutsättningar

  • Skapa en notebook.
  • Bifoga anteckningsboken till ett sjöhus. I anteckningsboken väljer du Lägg till i den vänstra rutan för att bifoga ett befintligt sjöhus eller skapa ett nytt.

Note

Alla bibliotek som används i den här artikeln (pyspark, synapseml, numpy) är förinstallerade i Fabric Spark-körningen. Du behöver inte installera några paket.

Läs in och utforska datan

I Fabric notebook-filer är en Spark-session redan tillgänglig som variabeln spark. Läs in datauppsättningen med Amazon-bokrecensioner från en offentlig plats i Azure Blob Storage:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Kontrollera att datauppsättningen har lästs in korrekt:

print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")

Extrahera funktioner och bearbeta data

Verkliga data har ofta funktioner av flera typer, till exempel text, numeriska och kategoriska. Om du vill visa hur du arbetar med blandade funktionstyper lägger du till två numeriska funktioner i datamängden: antalet ord i recensionen och medelvärdet av ordlängden.

Definiera användardefinierade funktioner (UDF:er)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np


def calc_word_count(s):
    return len(s.split())


def calc_word_length(s):
    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())

Tillämpa UDF-filer med SynapseML UDFTransformer

Använd UDFTransformer från SynapseML för att kapsla in UDF:erna i pipelinekompatibla transformerare:

from synapse.ml.stages import UDFTransformer

wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol="wordCount", udf=wordCountUDF
)

Kör funktionspipelinen

Använd båda transformatorerna och skapa en binär etikettkolumn från betyget:

from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)

Kontrollera funktionsutvinningen:

data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")

Klassificera med pyspark

Om du vill välja den bästa LogisticRegression-klassificeraren med hjälp av pyspark biblioteket måste du uttryckligen utföra följande steg:

  1. Bearbeta funktionerna:
    • Tokenisera textkolumnen.
    • Hash den tokeniserade kolumnen till en vektor med hjälp av hashning.
    • Sammanfoga de numeriska funktionerna med vektorn.
  2. Omvandla etikettkolumnen från boolesk till heltalstyp.
  3. Träna flera LogisticRegression-algoritmer på datauppsättningen train med olika hyperparametrar.
  4. Beräkna området under ROC-kurvan (AUC) för varje tränad modell och välj den modell som har det högsta måttet på datamängden test .
  5. Utvärdera den bästa modellen på validation-mängden.

Funktionalisera och förbereda data

from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)

Verifiera de funktionaliserade data:

print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")

Träna och utvärdera modeller

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Train each model and evaluate on the test set
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))

bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")

Kontrollera resultatet:

print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")

Note

De exakta AUC-värdena beror på den slumpmässiga uppdelningen. Förvänta dig värden mellan 0,65 och 0,85.

Klassificera med SynapseML

Metoden synapseml uppnår samma resultat med färre steg. SynapseML hanterar funktionalisering internt, vilket minskar den kod du behöver skriva:

  1. Skattaren TrainClassifier omvandlar data internt till egenskaper, så länge kolumnerna i datauppsättningarna train, test och validation representerar egenskaperna.
  2. Skattaren FindBestModel hittar den bästa modellen från en pool med tränade modeller genom att utvärdera prestanda på datamängden test med det angivna måttet.
  3. Transformatorn ComputeModelStatistics beräknar flera mått på en poängsatt datamängd (i det här fallet datamängden validation ) samtidigt.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression

# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)

# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Kontrollera SynapseML-resultaten:

auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")

Note

Metoderna pyspark och SynapseML bör generera liknande AUC-värden, eftersom de tränar samma modelltyp med samma hyperparametrar på samma data.

Jämför de två metoderna

Aspect pyspark SynapseML
Bearbetning av funktioner Manuell (Tokenizer -> HashingTF -> VectorAssembler) Automatisk (hanteras av TrainClassifier)
Modellval Manuell slinga med en utvärderare Inbyggd FindBestModel
Beräkning av mått Enskilt mått per utvärderingsanrop Flera mått med ComputeModelStatistics
Kodrader Cirka 30 rader Cirka 15 rader
Result Samma AUC Samma AUC

Felsökning

Problematik Orsak Lösning
AnalysisException: Path does not exist Den offentliga bloblagrings-URL:en är inte tillgänglig för tillfället Vänta några minuter och försök igen. Verifiera anslutningen genom att köra spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count()
IllegalArgumentException: Field "features" does not exist Funktionskolumnnamnen matchar inte mellan transformatorer Verifiera kolumnnamn genom att köra data.columns före VectorAssembler-steget
NameError: name 'LogisticRegression' is not defined Importuttryck saknas Lägg till from pyspark.ml.classification import LogisticRegression överst i cellen
ModuleNotFoundError: No module named 'synapse.ml' Notebook använder inte Fabric Spark-körning Kontrollera att anteckningsboken använder Fabric Runtime 1.2 eller senare. Välj Miljö i menyfliksområdet för att kontrollera.
Låg AUC (under 0,6) Problem med datadelning eller konvergens Kontrollera etikettfördelningen med data.groupBy("label").count().show(). Förvänta dig en ungefärligt balanserad datauppsättning.
Py4JJavaError: An error occurred while calling internt fel i Java/Spark I Spark-användargränssnittet finns detaljerade felloggar. Starta om Spark-sessionen genom att väljaSessionsstoppsession> och kör sedan alla celler igen.

Rensa resurser

Om du har skapat ett nytt lakehouse för den här artikeln och inte längre behöver det:

  1. I arbetsytan högerklickar du på namnet på lakehouse.
  2. Välj Ta bort.
  3. Bekräfta borttagningen.

Anteckningsboken finns kvar på arbetsytan om du inte tar bort den separat.