Et comment PySpark peut vous aider à gérer d'énormes ensembles de données comme un pro
Les frameworks d'apprentissage automatique comme PyTorch et TensorFlow sont géniaux pour créer des modèles. Mais la réalité est que lorsqu’il s’agit de projets du monde réel, où vous traitez de gigantesques ensembles de données, vous avez besoin de plus qu’un simple bon modèle. Vous avez besoin d’un moyen de traiter et de gérer efficacement toutes ces données. C'est là que l'informatique distribuée, comme PySpark, entre en jeu pour sauver la situation.
Expliquons pourquoi la gestion du Big Data dans le monde réel de l'apprentissage automatique signifie aller au-delà de PyTorch et TensorFlow, et comment PySpark vous aide à y parvenir.
Le vrai problème : le Big Data
La plupart des exemples de ML que vous voyez en ligne utilisent de petits ensembles de données gérables. Vous pouvez mettre le tout en mémoire, jouer avec et entraîner un modèle en quelques minutes. Mais dans des scénarios réels, comme la détection de fraude par carte de crédit, les systèmes de recommandation ou les prévisions financières, vous avez affaire à des millions, voire des milliards de lignes. Soudain, votre ordinateur portable ou votre serveur ne peut plus le gérer.
Si vous essayez de charger toutes ces données dans PyTorch ou TensorFlow en même temps, les choses vont se casser. Ces frameworks sont conçus pour la formation de modèles, et non pour gérer efficacement d'énormes ensembles de données. C'est là que l'informatique distribuée devient cruciale.
Pourquoi PyTorch et TensorFlow ne suffisent pas
PyTorch et TensorFlow sont parfaits pour créer et optimiser des modèles, mais ils ne sont pas à la hauteur lorsqu'il s'agit de tâches de données à grande échelle. Deux problèmes majeurs :
C'est là que PySpark brille. Il est conçu pour fonctionner avec des données distribuées, en les traitant efficacement sur plusieurs machines tout en gérant des ensembles de données volumineux sans faire planter votre système.
Exemple concret : détection de fraude par carte de crédit avec PySpark
Prenons un exemple. Supposons que vous travailliez sur un système de détection de fraude utilisant les données de transactions par carte de crédit. Dans ce cas, nous utiliserons un ensemble de données populaire de Kaggle. Il contient plus de 284 000 transactions, et moins de 1 % d'entre elles sont frauduleuses.
Étape 1 : configurer PySpark dans Google Colab
Nous utiliserons Google Colab pour cela car il nous permet d'exécuter PySpark avec une configuration minimale.
!pip install pyspark
Ensuite, importez les bibliothèques nécessaires et démarrez une session Spark.
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, udf from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler from pyspark.ml.classification import RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors import numpy as np from pyspark.sql.types import FloatType
démarrer une session pyspark
spark = SparkSession.builder \ .appName("FraudDetectionImproved") \ .master("local[*]") \ .config("spark.executorEnv.PYTHONHASHSEED", "0") \ .getOrCreate()
Étape 2 : Charger et préparer les données
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True) data = data.orderBy("Time") # Ensure data is sorted by time data.show(5) data.describe().show()
# Check for missing values in each column data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show() # Prepare the feature columns feature_columns = data.columns feature_columns.remove("Class") # Removing "Class" column as it is our label # Assemble features into a single vector assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") data = assembler.transform(data) data.select("features", "Class").show(5) # Split data into train (60%), test (20%), and unseen (20%) train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42) test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42) # Print class distribution in each dataset print("Train Data:") train_data.groupBy("Class").count().show() print("Test and parameter optimisation Data:") test_data.groupBy("Class").count().show() print("Unseen Data:") unseen_data.groupBy("Class").count().show()
Étape 3 : initialiser le modèle
# Initialize RandomForestClassifier rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability") # Create ParamGrid for Cross Validation paramGrid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20 ]) \ .addGrid(rf.maxDepth, [5, 10]) \ .build() # Create 5-fold CrossValidator crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"), numFolds=5)
Étape 4 : Ajuster, exécuter la validation croisée et choisir le meilleur ensemble de paramètres
# Run cross-validation, and choose the best set of parameters rf_model = crossval.fit(train_data) # Make predictions on test data predictions_rf = rf_model.transform(test_data) # Evaluate Random Forest Model binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC") pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR") auc_rf = binary_evaluator.evaluate(predictions_rf) auprc_rf = pr_evaluator.evaluate(predictions_rf) print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}") # UDF to extract positive probability from probability vector extract_prob = udf(lambda prob: float(prob[1]), FloatType()) predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
Étape 5 Fonction pour calculer la précision, le rappel et le score F1
# Function to calculate precision, recall, and F1-score def calculate_metrics(predictions): tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count() fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count() fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count() precision = tp / (tp fp) if (tp fp) != 0 else 0 recall = tp / (tp fn) if (tp fn) != 0 else 0 f1_score = (2 * precision * recall) / (precision recall) if (precision recall) != 0 else 0 return precision, recall, f1_score
Étape 6 : Trouver le meilleur seuil pour le modèle
# Find the best threshold for the model best_threshold = 0.5 best_f1 = 0 for threshold in np.arange(0.1, 0.9, 0.1): thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double")) precision, recall, f1 = calculate_metrics(thresholded_predictions) if f1 > best_f1: best_f1 = f1 best_threshold = threshold print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
Étape 7 : Évaluer sur des données invisibles
# Evaluate on unseen data predictions_unseen = rf_model.transform(unseen_data) auc_unseen = binary_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {auc_unseen:.4f}") precision, recall, f1 = calculate_metrics(predictions_unseen) print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}") area_under_roc = binary_evaluator.evaluate(predictions_unseen) area_under_pr = pr_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
RÉSULTATS
Best threshold: 0.30000000000000004, Best F1-score: 0.9062 Unseen Data - AUC: 0.9384 Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485 Unseen Data - AUC: 0.9423, AUPRC: 0.8618
Vous pouvez ensuite enregistrer ce modèle (quelques Ko) et l'utiliser n'importe où dans le pipeline pyspark
rf_model.save()
Voici pourquoi PySpark fait une énorme différence lorsqu'il s'agit de traiter de grands ensembles de données dans des tâches d'apprentissage automatique réelles :
Il évolue facilement : PySpark peut répartir les tâches sur plusieurs clusters, vous permettant de traiter des téraoctets de données sans manquer de mémoire.
Traitement des données à la volée : PySpark n'a pas besoin de charger l'intégralité de l'ensemble de données en mémoire. Il traite les données selon les besoins, ce qui le rend beaucoup plus efficace.
Formation de modèles plus rapide : grâce à l'informatique distribuée, vous pouvez former des modèles plus rapidement en répartissant la charge de travail de calcul sur plusieurs machines.
Pensées finales
PyTorch et TensorFlow sont des outils fantastiques pour créer des modèles d'apprentissage automatique, mais pour les tâches réelles à grande échelle, vous avez besoin de plus. L'informatique distribuée avec PySpark vous permet de gérer efficacement d'énormes ensembles de données, de traiter les données en temps réel et de faire évoluer vos pipelines d'apprentissage automatique.
Donc, la prochaine fois que vous travaillerez avec des données massives, qu'il s'agisse de détection de fraude, de systèmes de recommandation ou d'analyse financière, pensez à utiliser PySpark pour faire passer votre projet au niveau supérieur.
Pour le code complet et les résultats, consultez ce bloc-notes. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
Je suis Swapnil, n'hésitez pas à laisser vos commentaires Résultats et idées, ou envoyez-moi un ping - [email protected] pour les données, les concerts et les emplois de développement de logiciels
Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.
Copyright© 2022 湘ICP备2022001581号-3