Und wie PySpark Ihnen dabei helfen kann, riesige Datensätze wie ein Profi zu verarbeiten
Frameworks für maschinelles Lernen wie PyTorch und TensorFlow eignen sich hervorragend zum Erstellen von Modellen. Die Realität ist jedoch, dass Sie bei realen Projekten, bei denen es um riesige Datensätze geht, mehr als nur ein gutes Modell benötigen. Sie benötigen eine Möglichkeit, all diese Daten effizient zu verarbeiten und zu verwalten. Hier kommt verteiltes Computing wie PySpark ins Spiel, um den Tag zu retten.
Lassen Sie uns erläutern, warum der Umgang mit Big Data beim realen maschinellen Lernen bedeutet, über PyTorch und TensorFlow hinauszugehen, und wie PySpark Ihnen dabei hilft, dorthin zu gelangen.
Das eigentliche Problem: Big Data
Die meisten ML-Beispiele, die Sie online sehen, verwenden kleine, überschaubare Datensätze. Sie können das Ganze in Ihrem Speicher speichern, damit herumspielen und in wenigen Minuten ein Modell trainieren. Aber in realen Szenarien – wie der Erkennung von Kreditkartenbetrug, Empfehlungssystemen oder Finanzprognosen – haben Sie es mit Millionen oder sogar Milliarden von Zeilen zu tun. Plötzlich kann Ihr Laptop oder Server damit nicht mehr umgehen.
Wenn Sie versuchen, alle Daten auf einmal in PyTorch oder TensorFlow zu laden, wird es kaputt gehen. Diese Frameworks sind für das Modelltraining konzipiert und nicht für den effizienten Umgang mit großen Datenmengen. Hier kommt der verteilten Datenverarbeitung eine entscheidende Bedeutung zu.
Warum PyTorch und TensorFlow nicht ausreichen
PyTorch und TensorFlow eignen sich hervorragend zum Erstellen und Optimieren von Modellen, sind jedoch bei der Bewältigung umfangreicher Datenaufgaben unzureichend. Zwei Hauptprobleme:
Hier glänzt PySpark. Es ist darauf ausgelegt, mit verteilten Daten zu arbeiten, diese effizient auf mehreren Computern zu verarbeiten und gleichzeitig riesige Datensätze zu verarbeiten, ohne dass Ihr System abstürzt.
Beispiel aus der Praxis: Erkennung von Kreditkartenbetrug mit PySpark
Schauen wir uns ein Beispiel an. Angenommen, Sie arbeiten an einem Betrugserkennungssystem, das Kreditkartentransaktionsdaten verwendet. In diesem Fall verwenden wir einen beliebten Datensatz von Kaggle. Es enthält über 284.000 Transaktionen, von denen weniger als 1 % betrügerisch sind.
Schritt 1: PySpark in Google Colab einrichten
Wir verwenden hierfür Google Colab, da wir damit PySpark mit minimalem Setup ausführen können.
!pip install pyspark
Als nächstes importieren Sie die erforderlichen Bibliotheken und starten eine Spark-Sitzung.
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, udf from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler from pyspark.ml.classification import RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors import numpy as np from pyspark.sql.types import FloatType
Starten Sie eine Pyspark-Sitzung
spark = SparkSession.builder \ .appName("FraudDetectionImproved") \ .master("local[*]") \ .config("spark.executorEnv.PYTHONHASHSEED", "0") \ .getOrCreate()
Schritt 2: Daten laden und vorbereiten
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True) data = data.orderBy("Time") # Ensure data is sorted by time data.show(5) data.describe().show()
# Check for missing values in each column data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show() # Prepare the feature columns feature_columns = data.columns feature_columns.remove("Class") # Removing "Class" column as it is our label # Assemble features into a single vector assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") data = assembler.transform(data) data.select("features", "Class").show(5) # Split data into train (60%), test (20%), and unseen (20%) train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42) test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42) # Print class distribution in each dataset print("Train Data:") train_data.groupBy("Class").count().show() print("Test and parameter optimisation Data:") test_data.groupBy("Class").count().show() print("Unseen Data:") unseen_data.groupBy("Class").count().show()
Schritt 3: Modell initialisieren
# Initialize RandomForestClassifier rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability") # Create ParamGrid for Cross Validation paramGrid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20 ]) \ .addGrid(rf.maxDepth, [5, 10]) \ .build() # Create 5-fold CrossValidator crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"), numFolds=5)
Schritt 4: Anpassen, Kreuzvalidierung durchführen und den besten Parametersatz auswählen
# Run cross-validation, and choose the best set of parameters rf_model = crossval.fit(train_data) # Make predictions on test data predictions_rf = rf_model.transform(test_data) # Evaluate Random Forest Model binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC") pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR") auc_rf = binary_evaluator.evaluate(predictions_rf) auprc_rf = pr_evaluator.evaluate(predictions_rf) print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}") # UDF to extract positive probability from probability vector extract_prob = udf(lambda prob: float(prob[1]), FloatType()) predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
Schritt 5 Funktion zur Berechnung von Präzision, Rückruf und F1-Score
# Function to calculate precision, recall, and F1-score def calculate_metrics(predictions): tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count() fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count() fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count() precision = tp / (tp fp) if (tp fp) != 0 else 0 recall = tp / (tp fn) if (tp fn) != 0 else 0 f1_score = (2 * precision * recall) / (precision recall) if (precision recall) != 0 else 0 return precision, recall, f1_score
Schritt 6: Finden Sie den besten Schwellenwert für das Modell
# Find the best threshold for the model best_threshold = 0.5 best_f1 = 0 for threshold in np.arange(0.1, 0.9, 0.1): thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double")) precision, recall, f1 = calculate_metrics(thresholded_predictions) if f1 > best_f1: best_f1 = f1 best_threshold = threshold print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
Schritt 7: Auswertung anhand unsichtbarer Daten
# Evaluate on unseen data predictions_unseen = rf_model.transform(unseen_data) auc_unseen = binary_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {auc_unseen:.4f}") precision, recall, f1 = calculate_metrics(predictions_unseen) print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}") area_under_roc = binary_evaluator.evaluate(predictions_unseen) area_under_pr = pr_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
ERGEBNISSE
Best threshold: 0.30000000000000004, Best F1-score: 0.9062 Unseen Data - AUC: 0.9384 Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485 Unseen Data - AUC: 0.9423, AUPRC: 0.8618
Sie können dieses Modell dann speichern (wenige KB) und es überall in der Pyspark-Pipeline verwenden
rf_model.save()
Hier erfahren Sie, warum PySpark beim Umgang mit großen Datensätzen in realen maschinellen Lernaufgaben einen großen Unterschied macht:
Es lässt sich leicht skalieren: PySpark kann Aufgaben über Cluster verteilen, sodass Sie Terabytes an Daten verarbeiten können, ohne dass Ihnen der Arbeitsspeicher ausgeht.
Datenverarbeitung im laufenden Betrieb: PySpark muss nicht den gesamten Datensatz in den Speicher laden. Es verarbeitet die Daten nach Bedarf, was es deutlich effizienter macht.
Schnelleres Modelltraining: Mit verteiltem Computing können Sie Modelle schneller trainieren, indem Sie die Rechenlast auf mehrere Maschinen verteilen.
Abschließende Gedanken
PyTorch und TensorFlow sind fantastische Tools zum Erstellen von Modellen für maschinelles Lernen, aber für reale, groß angelegte Aufgaben benötigen Sie mehr. Durch verteiltes Computing mit PySpark können Sie große Datensätze effizient verarbeiten, Daten in Echtzeit verarbeiten und Ihre Pipelines für maschinelles Lernen skalieren.
Wenn Sie also das nächste Mal mit riesigen Datenmengen arbeiten – sei es Betrugserkennung, Empfehlungssysteme oder Finanzanalysen –, sollten Sie PySpark verwenden, um Ihr Projekt auf die nächste Stufe zu heben.
Den vollständigen Code und die Ergebnisse finden Sie in diesem Notebook. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
Ich bin Swapnil, hinterlassen Sie gerne Ihre Kommentare, Ergebnisse und Ideen, oder pingen Sie mich an – [email protected] für Daten, Software-Entwickler-Auftritte und Jobs
Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.
Copyright© 2022 湘ICP备2022001581号-3