PySpark가 전문가처럼 대규모 데이터 세트를 처리하는 데 어떻게 도움이 되는지
PyTorch 및 TensorFlow와 같은 기계 학습 프레임워크는 모델 구축에 매우 적합합니다. 그러나 현실은 거대한 데이터 세트를 다루는 실제 프로젝트의 경우 좋은 모델 그 이상이 필요하다는 것입니다. 모든 데이터를 효율적으로 처리하고 관리할 수 있는 방법이 필요합니다. PySpark와 같은 분산 컴퓨팅이 문제를 해결하기 위해 등장하는 곳입니다.
실제 머신러닝에서 빅데이터를 처리하는 것이 PyTorch와 TensorFlow를 넘어서는 이유와 PySpark가 이를 달성하는 데 어떻게 도움이 되는지 자세히 살펴보겠습니다.
진짜 문제: 빅 데이터
온라인에서 볼 수 있는 대부분의 ML 예제는 작고 관리 가능한 데이터 세트를 사용합니다. 모든 것을 메모리에 넣고, 가지고 놀고, 몇 분 안에 모델을 훈련할 수 있습니다. 그러나 신용 카드 사기 감지, 추천 시스템, 재무 예측과 같은 실제 시나리오에서는 수백만 또는 수십억 개의 행을 처리하게 됩니다. 갑자기 노트북이나 서버가 이를 처리할 수 없게 되었습니다.
모든 데이터를 PyTorch 또는 TensorFlow에 한 번에 로드하려고 하면 문제가 발생합니다. 이러한 프레임워크는 대규모 데이터 세트를 효율적으로 처리하기 위한 것이 아니라 모델 교육용으로 설계되었습니다. 이것이 바로 분산 컴퓨팅이 중요한 부분입니다.
PyTorch와 TensorFlow가 충분하지 않은 이유
PyTorch와 TensorFlow는 모델 구축 및 최적화에 적합하지만 대규모 데이터 작업을 처리할 때는 부족합니다. 두 가지 주요 문제:
이것이 PySpark가 빛나는 곳입니다. 분산 데이터를 사용하여 여러 시스템에서 효율적으로 처리하는 동시에 시스템 충돌 없이 대규모 데이터 세트를 처리하도록 설계되었습니다.
실제 사례: PySpark를 사용한 신용카드 사기 탐지
예를 들어 보겠습니다. 신용 카드 거래 데이터를 사용하는 사기 탐지 시스템을 작업하고 있다고 가정해 보겠습니다. 이 경우 Kaggle의 인기 있는 데이터 세트를 사용하겠습니다. 여기에는 284,000건이 넘는 거래가 포함되어 있으며 그 중 사기성 거래는 1% 미만입니다.
1단계: Google Colab에서 PySpark 설정
최소한의 설정으로 PySpark를 실행할 수 있는 Google Colab을 사용하겠습니다.
!pip install pyspark
다음으로 필요한 라이브러리를 가져오고 Spark 세션을 시작하세요.
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, udf from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler from pyspark.ml.classification import RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors import numpy as np from pyspark.sql.types import FloatType
pyspark 세션 시작
spark = SparkSession.builder \ .appName("FraudDetectionImproved") \ .master("local[*]") \ .config("spark.executorEnv.PYTHONHASHSEED", "0") \ .getOrCreate()
2단계: 데이터 로드 및 준비
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True) data = data.orderBy("Time") # Ensure data is sorted by time data.show(5) data.describe().show()
# Check for missing values in each column data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show() # Prepare the feature columns feature_columns = data.columns feature_columns.remove("Class") # Removing "Class" column as it is our label # Assemble features into a single vector assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") data = assembler.transform(data) data.select("features", "Class").show(5) # Split data into train (60%), test (20%), and unseen (20%) train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42) test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42) # Print class distribution in each dataset print("Train Data:") train_data.groupBy("Class").count().show() print("Test and parameter optimisation Data:") test_data.groupBy("Class").count().show() print("Unseen Data:") unseen_data.groupBy("Class").count().show()
3단계: 모델 초기화
# Initialize RandomForestClassifier rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability") # Create ParamGrid for Cross Validation paramGrid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20 ]) \ .addGrid(rf.maxDepth, [5, 10]) \ .build() # Create 5-fold CrossValidator crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"), numFolds=5)
4단계: 적합, 교차 검증 실행, 가장 적합한 매개변수 세트 선택
# Run cross-validation, and choose the best set of parameters rf_model = crossval.fit(train_data) # Make predictions on test data predictions_rf = rf_model.transform(test_data) # Evaluate Random Forest Model binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC") pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR") auc_rf = binary_evaluator.evaluate(predictions_rf) auprc_rf = pr_evaluator.evaluate(predictions_rf) print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}") # UDF to extract positive probability from probability vector extract_prob = udf(lambda prob: float(prob[1]), FloatType()) predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
5단계 정밀도, 재현율, F1 점수를 계산하는 함수
# Function to calculate precision, recall, and F1-score def calculate_metrics(predictions): tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count() fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count() fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count() precision = tp / (tp fp) if (tp fp) != 0 else 0 recall = tp / (tp fn) if (tp fn) != 0 else 0 f1_score = (2 * precision * recall) / (precision recall) if (precision recall) != 0 else 0 return precision, recall, f1_score
6단계: 모델에 가장 적합한 임계값 찾기
# Find the best threshold for the model best_threshold = 0.5 best_f1 = 0 for threshold in np.arange(0.1, 0.9, 0.1): thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double")) precision, recall, f1 = calculate_metrics(thresholded_predictions) if f1 > best_f1: best_f1 = f1 best_threshold = threshold print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
7단계: 보이지 않는 데이터 평가
# Evaluate on unseen data predictions_unseen = rf_model.transform(unseen_data) auc_unseen = binary_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {auc_unseen:.4f}") precision, recall, f1 = calculate_metrics(predictions_unseen) print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}") area_under_roc = binary_evaluator.evaluate(predictions_unseen) area_under_pr = pr_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
결과
Best threshold: 0.30000000000000004, Best F1-score: 0.9062 Unseen Data - AUC: 0.9384 Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485 Unseen Data - AUC: 0.9423, AUPRC: 0.8618
그런 다음 이 모델(몇 KB)을 저장하고 pyspark 파이프라인 어디에서나 사용할 수 있습니다.
rf_model.save()
실제 기계 학습 작업에서 대규모 데이터 세트를 처리할 때 PySpark가 큰 차이를 만드는 이유는 다음과 같습니다.
손쉬운 확장: PySpark는 클러스터 전체에 작업을 분산하여 메모리 부족 없이 테라바이트 규모의 데이터를 처리할 수 있습니다.
즉각적인 데이터 처리: PySpark는 전체 데이터 세트를 메모리에 로드할 필요가 없습니다. 필요에 따라 데이터를 처리하므로 훨씬 더 효율적입니다.
더 빠른 모델 교육: 분산 컴퓨팅을 사용하면 여러 시스템에 컴퓨팅 워크로드를 분산하여 모델을 더 빠르게 교육할 수 있습니다.
최종 생각
PyTorch와 TensorFlow는 머신러닝 모델을 구축하기 위한 환상적인 도구이지만 실제 대규모 작업에는 더 많은 것이 필요합니다. PySpark를 사용한 분산 컴퓨팅을 사용하면 대규모 데이터세트를 효율적으로 처리하고 실시간으로 데이터를 처리하며 기계 학습 파이프라인을 확장할 수 있습니다.
따라서 다음에 사기 탐지, 추천 시스템, 재무 분석 등 대규모 데이터를 사용하여 작업할 때 PySpark를 사용하여 프로젝트를 한 단계 더 발전시키는 것을 고려해 보세요.
전체 코드와 결과를 보려면 이 노트북을 확인하세요. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
저는 Swapnil입니다. 결과와 아이디어에 대한 의견을 남기시거나 데이터, 소프트웨어 개발 작업 및 채용 정보에 대해 [email protected]로 저에게 메시지를 보내주세요.
부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.
Copyright© 2022 湘ICP备2022001581号-3