PySpark がプロのように巨大なデータセットを処理するのにどのように役立つか
PyTorch や TensorFlow などの機械学習フレームワークは、モデルの構築に最適です。しかし現実には、巨大なデータセットを扱う現実世界のプロジェクトとなると、単なる優れたモデル以上のものが必要になります。すべてのデータを効率的に処理および管理する方法が必要です。そこで窮地を救うために、PySpark のような分散コンピューティングが登場します。
現実世界の機械学習でビッグデータを扱うことが、PyTorch や TensorFlow を超えることを意味する理由と、そこに到達するのに PySpark がどのように役立つのかを詳しく見てみましょう。
本当の問題: ビッグデータ
オンラインで見るほとんどの ML サンプルでは、小さくて管理しやすいデータセットが使用されています。すべてをメモリに格納し、試して、モデルを数分でトレーニングできます。しかし、クレジット カード不正行為の検出、推奨システム、財務予測などの現実のシナリオでは、数百万、場合によっては数十億の行を処理することになります。突然、ラップトップまたはサーバーが処理できなくなります。
すべてのデータを一度に PyTorch または TensorFlow にロードしようとすると、問題が発生します。これらのフレームワークは、巨大なデータセットを効率的に処理するためではなく、モデルのトレーニングのために設計されています。ここで分散コンピューティングが重要になります。
PyTorch と TensorFlow だけでは不十分な理由
PyTorch と TensorFlow はモデルの構築と最適化には最適ですが、大規模なデータ タスクを扱う場合には不十分です。 2 つの大きな問題:
ここで PySpark が威力を発揮します。分散データを操作するように設計されており、システムをクラッシュさせることなく大量のデータセットを処理しながら、複数のマシン間でデータを効率的に処理します。
実際の例: PySpark を使用したクレジット カード詐欺の検出
例を見てみましょう。クレジット カード取引データを使用した不正検出システムに取り組んでいるとします。この場合、Kaggle の人気のあるデータセットを使用します。これには 284,000 件を超える取引が含まれており、そのうち不正なものは 1% 未満です。
ステップ 1: Google Colab で PySpark をセットアップする
最小限のセットアップで PySpark を実行できるため、これには Google Colab を使用します。
!pip install pyspark
次に、必要なライブラリをインポートし、Spark セッションを開始します。
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, udf from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler from pyspark.ml.classification import RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors import numpy as np from pyspark.sql.types import FloatType
pyspark セッションを開始します
spark = SparkSession.builder \ .appName("FraudDetectionImproved") \ .master("local[*]") \ .config("spark.executorEnv.PYTHONHASHSEED", "0") \ .getOrCreate()
ステップ 2: データのロードと準備
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True) data = data.orderBy("Time") # Ensure data is sorted by time data.show(5) data.describe().show()
# Check for missing values in each column data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show() # Prepare the feature columns feature_columns = data.columns feature_columns.remove("Class") # Removing "Class" column as it is our label # Assemble features into a single vector assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") data = assembler.transform(data) data.select("features", "Class").show(5) # Split data into train (60%), test (20%), and unseen (20%) train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42) test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42) # Print class distribution in each dataset print("Train Data:") train_data.groupBy("Class").count().show() print("Test and parameter optimisation Data:") test_data.groupBy("Class").count().show() print("Unseen Data:") unseen_data.groupBy("Class").count().show()
ステップ 3: モデルを初期化する
# Initialize RandomForestClassifier rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability") # Create ParamGrid for Cross Validation paramGrid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20 ]) \ .addGrid(rf.maxDepth, [5, 10]) \ .build() # Create 5-fold CrossValidator crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"), numFolds=5)
ステップ 4: 適合、相互検証を実行し、最適なパラメータのセットを選択します
# Run cross-validation, and choose the best set of parameters rf_model = crossval.fit(train_data) # Make predictions on test data predictions_rf = rf_model.transform(test_data) # Evaluate Random Forest Model binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC") pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR") auc_rf = binary_evaluator.evaluate(predictions_rf) auprc_rf = pr_evaluator.evaluate(predictions_rf) print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}") # UDF to extract positive probability from probability vector extract_prob = udf(lambda prob: float(prob[1]), FloatType()) predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
ステップ 5 適合率、再現率、F1 スコアを計算する関数
# Function to calculate precision, recall, and F1-score def calculate_metrics(predictions): tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count() fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count() fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count() precision = tp / (tp fp) if (tp fp) != 0 else 0 recall = tp / (tp fn) if (tp fn) != 0 else 0 f1_score = (2 * precision * recall) / (precision recall) if (precision recall) != 0 else 0 return precision, recall, f1_score
ステップ 6: モデルの最適なしきい値を見つける
# Find the best threshold for the model best_threshold = 0.5 best_f1 = 0 for threshold in np.arange(0.1, 0.9, 0.1): thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double")) precision, recall, f1 = calculate_metrics(thresholded_predictions) if f1 > best_f1: best_f1 = f1 best_threshold = threshold print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
ステップ 7: 目に見えないデータを評価する
# Evaluate on unseen data predictions_unseen = rf_model.transform(unseen_data) auc_unseen = binary_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {auc_unseen:.4f}") precision, recall, f1 = calculate_metrics(predictions_unseen) print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}") area_under_roc = binary_evaluator.evaluate(predictions_unseen) area_under_pr = pr_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
結果
Best threshold: 0.30000000000000004, Best F1-score: 0.9062 Unseen Data - AUC: 0.9384 Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485 Unseen Data - AUC: 0.9423, AUPRC: 0.8618
このモデル (数 KB) を保存して、pyspark パイプラインのどこでも使用できます
rf_model.save()
現実の機械学習タスクで大規模なデータセットを扱うときに PySpark が大きな違いを生む理由は次のとおりです:
簡単に拡張できます: PySpark はクラスター全体にタスクを分散できるため、メモリ不足になることなくテラバイト規模のデータを処理できます。
オンザフライのデータ処理: PySpark はデータセット全体をメモリにロードする必要がありません。必要に応じてデータを処理するため、効率が大幅に向上します。
モデル トレーニングの高速化: 分散コンピューティングを使用すると、計算ワークロードを複数のマシンに分散することで、モデルをより速くトレーニングできます。
最終的な考え
PyTorch と TensorFlow は機械学習モデルを構築するための素晴らしいツールですが、現実世界の大規模なタスクにはさらに多くのツールが必要です。 PySpark を使用した分散コンピューティングにより、巨大なデータセットを効率的に処理し、リアルタイムでデータを処理し、機械学習パイプラインを拡張することができます。
したがって、次回不正検出、推奨システム、財務分析などの大規模なデータを扱うときは、PySpark を使用してプロジェクトを次のレベルに引き上げることを検討してください。
完全なコードと結果については、このノートブックを参照してください。 :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
私は Swapnil です。お気軽にコメントを残してください。結果やアイデア、またはデータ、ソフトウェア開発の仕事や仕事については [email protected] にメールしてください
免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。
Copyright© 2022 湘ICP备2022001581号-3