Y cómo PySpark puede ayudarle a manejar enormes conjuntos de datos como un profesional
Los marcos de aprendizaje automático como PyTorch y TensorFlow son fantásticos para crear modelos. Pero la realidad es que, cuando se trata de proyectos del mundo real (en los que se trata de conjuntos de datos gigantescos), se necesita algo más que un buen modelo. Necesita una manera de procesar y administrar eficientemente todos esos datos. Ahí es donde la informática distribuida, como PySpark, entra en juego para salvar el día.
Analicemos por qué manejar big data en el aprendizaje automático del mundo real significa ir más allá de PyTorch y TensorFlow, y cómo PySpark le ayuda a llegar allí.
El verdadero problema: Big Data
La mayoría de los ejemplos de ML que ve en línea utilizan conjuntos de datos pequeños y manejables. Puedes guardar todo en la memoria, jugar con él y entrenar un modelo en minutos. Pero en escenarios del mundo real (como la detección de fraudes con tarjetas de crédito, los sistemas de recomendación o los pronósticos financieros) se trata de millones o incluso miles de millones de filas. De repente, tu computadora portátil o servidor no puede soportarlo.
Si intentas cargar todos esos datos en PyTorch o TensorFlow a la vez, todo se romperá. Estos marcos están diseñados para el entrenamiento de modelos, no para manejar de manera eficiente grandes conjuntos de datos. Aquí es donde la informática distribuida se vuelve crucial.
Por qué PyTorch y TensorFlow no son suficientes
PyTorch y TensorFlow son excelentes para crear y optimizar modelos, pero se quedan cortos cuando se trata de tareas de datos a gran escala. Dos problemas importantes:
Aquí es donde brilla PySpark. Está diseñado para trabajar con datos distribuidos, procesándolos de manera eficiente en múltiples máquinas mientras maneja conjuntos de datos masivos sin bloquear su sistema.
Ejemplo del mundo real: detección de fraude con tarjetas de crédito con PySpark
Profundicemos en un ejemplo. Suponga que está trabajando en un sistema de detección de fraude utilizando datos de transacciones de tarjetas de crédito. En este caso, utilizaremos un conjunto de datos popular de Kaggle. Contiene más de 284.000 transacciones y menos del 1% de ellas son fraudulentas.
Paso 1: Configurar PySpark en Google Colab
Usaremos Google Colab para esto porque nos permite ejecutar PySpark con una configuración mínima.
!pip install pyspark
A continuación, importe las bibliotecas necesarias e inicie una sesión de Spark.
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, udf from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler from pyspark.ml.classification import RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors import numpy as np from pyspark.sql.types import FloatType
iniciar una sesión de pyspark
spark = SparkSession.builder \ .appName("FraudDetectionImproved") \ .master("local[*]") \ .config("spark.executorEnv.PYTHONHASHSEED", "0") \ .getOrCreate()
Paso 2: cargar y preparar datos
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True) data = data.orderBy("Time") # Ensure data is sorted by time data.show(5) data.describe().show()
# Check for missing values in each column data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show() # Prepare the feature columns feature_columns = data.columns feature_columns.remove("Class") # Removing "Class" column as it is our label # Assemble features into a single vector assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") data = assembler.transform(data) data.select("features", "Class").show(5) # Split data into train (60%), test (20%), and unseen (20%) train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42) test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42) # Print class distribution in each dataset print("Train Data:") train_data.groupBy("Class").count().show() print("Test and parameter optimisation Data:") test_data.groupBy("Class").count().show() print("Unseen Data:") unseen_data.groupBy("Class").count().show()
Paso 3: Inicializar el modelo
# Initialize RandomForestClassifier rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability") # Create ParamGrid for Cross Validation paramGrid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20 ]) \ .addGrid(rf.maxDepth, [5, 10]) \ .build() # Create 5-fold CrossValidator crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"), numFolds=5)
Paso 4: Ajustar, ejecutar validación cruzada y elegir el mejor conjunto de parámetros
# Run cross-validation, and choose the best set of parameters rf_model = crossval.fit(train_data) # Make predictions on test data predictions_rf = rf_model.transform(test_data) # Evaluate Random Forest Model binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC") pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR") auc_rf = binary_evaluator.evaluate(predictions_rf) auprc_rf = pr_evaluator.evaluate(predictions_rf) print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}") # UDF to extract positive probability from probability vector extract_prob = udf(lambda prob: float(prob[1]), FloatType()) predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
Paso 5 Función para calcular precisión, recuperación y puntuación F1
# Function to calculate precision, recall, and F1-score def calculate_metrics(predictions): tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count() fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count() fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count() precision = tp / (tp fp) if (tp fp) != 0 else 0 recall = tp / (tp fn) if (tp fn) != 0 else 0 f1_score = (2 * precision * recall) / (precision recall) if (precision recall) != 0 else 0 return precision, recall, f1_score
Paso 6: Encuentre el mejor umbral para el modelo
# Find the best threshold for the model best_threshold = 0.5 best_f1 = 0 for threshold in np.arange(0.1, 0.9, 0.1): thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double")) precision, recall, f1 = calculate_metrics(thresholded_predictions) if f1 > best_f1: best_f1 = f1 best_threshold = threshold print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
Paso 7: Evaluar datos no vistos
# Evaluate on unseen data predictions_unseen = rf_model.transform(unseen_data) auc_unseen = binary_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {auc_unseen:.4f}") precision, recall, f1 = calculate_metrics(predictions_unseen) print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}") area_under_roc = binary_evaluator.evaluate(predictions_unseen) area_under_pr = pr_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
RESULTADOS
Best threshold: 0.30000000000000004, Best F1-score: 0.9062 Unseen Data - AUC: 0.9384 Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485 Unseen Data - AUC: 0.9423, AUPRC: 0.8618
Luego puede guardar este modelo (unos pocos KB) y usarlo en cualquier lugar de pyspark pipeline
rf_model.save()
He aquí por qué PySpark marca una gran diferencia cuando se trata de grandes conjuntos de datos en tareas de aprendizaje automático del mundo real:
Se escala fácilmente: PySpark puede distribuir tareas entre clústeres, lo que le permite procesar terabytes de datos sin quedarse sin memoria.
Procesamiento de datos sobre la marcha: PySpark no necesita cargar todo el conjunto de datos en la memoria. Procesa los datos según sea necesario, lo que los hace mucho más eficientes.
Entrenamiento de modelos más rápido: con la computación distribuida, puede entrenar modelos más rápido distribuyendo la carga de trabajo computacional entre múltiples máquinas.
Pensamientos finales
PyTorch y TensorFlow son herramientas fantásticas para crear modelos de aprendizaje automático, pero para tareas del mundo real a gran escala, se necesitan más. La computación distribuida con PySpark le permite manejar enormes conjuntos de datos de manera eficiente, procesar datos en tiempo real y escalar sus procesos de aprendizaje automático.
Por lo tanto, la próxima vez que trabaje con datos masivos, ya sea detección de fraude, sistemas de recomendación o análisis financiero, considere usar PySpark para llevar su proyecto al siguiente nivel.
Para obtener el código completo y los resultados, consulta este cuaderno. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
Soy Swapnil, no dudes en dejar tus comentarios, resultados e ideas, o envíame un mensaje de correo electrónico a [email protected] para trabajos y trabajos de desarrollo de software y datos
Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.
Copyright© 2022 湘ICP备2022001581号-3