И как PySpark может помочь вам профессионально обрабатывать огромные наборы данных
Среды машинного обучения, такие как PyTorch и TensorFlow, отлично подходят для создания моделей. Но реальность такова, что когда дело доходит до реальных проектов, где вы имеете дело с гигантскими наборами данных, вам нужно нечто большее, чем просто хорошая модель. Вам нужен способ эффективной обработки и управления всеми этими данными. Именно здесь на помощь приходят распределенные вычисления, такие как PySpark.
Давайте разберемся, почему обработка больших данных в реальном машинном обучении означает выход за рамки PyTorch и TensorFlow, и как PySpark помогает вам в этом.
Настоящая проблема: большие данные
Большинство примеров машинного обучения, которые вы видите в Интернете, используют небольшие управляемые наборы данных. Вы можете вместить все это в память, поиграть с этим и обучить модель за считанные минуты. Но в реальных сценариях — таких как обнаружение мошенничества с кредитными картами, системы рекомендаций или финансовые прогнозы — вы имеете дело с миллионами или даже миллиардами строк. Внезапно ваш ноутбук или сервер не справится с этим.
Если вы попытаетесь загрузить все эти данные в PyTorch или TensorFlow одновременно, все сломается. Эти платформы предназначены для обучения моделей, а не для эффективной обработки огромных наборов данных. Именно здесь распределенные вычисления становятся решающими.
Почему PyTorch и TensorFlow недостаточно
PyTorch и TensorFlow отлично подходят для создания и оптимизации моделей, но их не хватает при работе с крупномасштабными задачами по работе с данными. Две основные проблемы:
Вот где сияет PySpark. Он предназначен для работы с распределенными данными, эффективной их обработки на нескольких машинах и обработки огромных наборов данных без сбоя в работе вашей системы.
Пример из реальной жизни: обнаружение мошенничества с кредитными картами с помощью PySpark
Давайте углубимся в пример. Предположим, вы работаете над системой обнаружения мошенничества, используя данные транзакций по кредитным картам. В этом случае мы будем использовать популярный набор данных от Kaggle. Он содержит более 284 000 транзакций, из которых менее 1% являются мошенническими.
Шаг 1. Настройте PySpark в Google Colab
Мы будем использовать для этого Google Colab, поскольку он позволяет нам запускать PySpark с минимальной настройкой.
!pip install pyspark
Затем импортируйте необходимые библиотеки и запустите сеанс Spark.
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, udf from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler from pyspark.ml.classification import RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors import numpy as np from pyspark.sql.types import FloatType
начать сеанс pyspark
spark = SparkSession.builder \ .appName("FraudDetectionImproved") \ .master("local[*]") \ .config("spark.executorEnv.PYTHONHASHSEED", "0") \ .getOrCreate()
Шаг 2. Загрузите и подготовьте данные
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True) data = data.orderBy("Time") # Ensure data is sorted by time data.show(5) data.describe().show()
# Check for missing values in each column data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show() # Prepare the feature columns feature_columns = data.columns feature_columns.remove("Class") # Removing "Class" column as it is our label # Assemble features into a single vector assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") data = assembler.transform(data) data.select("features", "Class").show(5) # Split data into train (60%), test (20%), and unseen (20%) train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42) test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42) # Print class distribution in each dataset print("Train Data:") train_data.groupBy("Class").count().show() print("Test and parameter optimisation Data:") test_data.groupBy("Class").count().show() print("Unseen Data:") unseen_data.groupBy("Class").count().show()
Шаг 3. Инициализация модели
# Initialize RandomForestClassifier rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability") # Create ParamGrid for Cross Validation paramGrid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20 ]) \ .addGrid(rf.maxDepth, [5, 10]) \ .build() # Create 5-fold CrossValidator crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"), numFolds=5)
Шаг 4. Подберите, запустите перекрестную проверку и выберите лучший набор параметров
# Run cross-validation, and choose the best set of parameters rf_model = crossval.fit(train_data) # Make predictions on test data predictions_rf = rf_model.transform(test_data) # Evaluate Random Forest Model binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC") pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR") auc_rf = binary_evaluator.evaluate(predictions_rf) auprc_rf = pr_evaluator.evaluate(predictions_rf) print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}") # UDF to extract positive probability from probability vector extract_prob = udf(lambda prob: float(prob[1]), FloatType()) predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
Шаг 5. Функция для расчета точности, полноты и показателя F1
# Function to calculate precision, recall, and F1-score def calculate_metrics(predictions): tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count() fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count() fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count() precision = tp / (tp fp) if (tp fp) != 0 else 0 recall = tp / (tp fn) if (tp fn) != 0 else 0 f1_score = (2 * precision * recall) / (precision recall) if (precision recall) != 0 else 0 return precision, recall, f1_score
Шаг 6. Найдите лучший порог для модели
# Find the best threshold for the model best_threshold = 0.5 best_f1 = 0 for threshold in np.arange(0.1, 0.9, 0.1): thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double")) precision, recall, f1 = calculate_metrics(thresholded_predictions) if f1 > best_f1: best_f1 = f1 best_threshold = threshold print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
Шаг 7. Оценка невидимых данных
# Evaluate on unseen data predictions_unseen = rf_model.transform(unseen_data) auc_unseen = binary_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {auc_unseen:.4f}") precision, recall, f1 = calculate_metrics(predictions_unseen) print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}") area_under_roc = binary_evaluator.evaluate(predictions_unseen) area_under_pr = pr_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
РЕЗУЛЬТАТЫ
Best threshold: 0.30000000000000004, Best F1-score: 0.9062 Unseen Data - AUC: 0.9384 Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485 Unseen Data - AUC: 0.9423, AUPRC: 0.8618
Затем вы можете сохранить эту модель (несколько КБ) и использовать ее где угодно в конвейере pyspark
rf_model.save()
Вот почему PySpark имеет огромное значение при работе с большими наборами данных в реальных задачах машинного обучения:
Легко масштабируется: PySpark может распределять задачи по кластерам, позволяя обрабатывать терабайты данных без нехватки памяти.
Обработка данных «на лету». PySpark не нужно загружать весь набор данных в память. Он обрабатывает данные по мере необходимости, что делает его гораздо более эффективным.
Более быстрое обучение моделей. Благодаря распределенным вычислениям вы можете обучать модели быстрее, распределяя вычислительную нагрузку между несколькими компьютерами.
Заключительные мысли
PyTorch и TensorFlow — фантастические инструменты для построения моделей машинного обучения, но для реальных крупномасштабных задач нужно нечто большее. Распределенные вычисления с помощью PySpark позволяют эффективно обрабатывать огромные наборы данных, обрабатывать данные в режиме реального времени и масштабировать конвейеры машинного обучения.
Итак, в следующий раз, когда вы будете работать с огромными данными — будь то обнаружение мошенничества, системы рекомендаций или финансовый анализ — рассмотрите возможность использования PySpark, чтобы вывести свой проект на новый уровень.
Полный код и результаты можно найти в этом блокноте. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
Я Swapnil, не стесняйтесь оставлять комментарии. Результаты и идеи или пишите мне по адресу [email protected], чтобы узнать о данных, концертах разработчиков программного обеспечения и вакансиях
Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.
Copyright© 2022 湘ICP备2022001581号-3