E como o PySpark pode ajudá-lo a lidar com enormes conjuntos de dados como um profissional
Estruturas de aprendizado de máquina como PyTorch e TensorFlow são incríveis para construir modelos. Mas a realidade é que, quando se trata de projetos do mundo real – onde você lida com conjuntos de dados gigantescos – você precisa de mais do que apenas um bom modelo. Você precisa de uma maneira de processar e gerenciar com eficiência todos esses dados. É aí que a computação distribuída, como o PySpark, entra para salvar o dia.
Vamos explicar por que lidar com big data no aprendizado de máquina do mundo real significa ir além do PyTorch e do TensorFlow e como o PySpark ajuda você a chegar lá.
O verdadeiro problema: Big Data
A maioria dos exemplos de ML que você vê online usa conjuntos de dados pequenos e gerenciáveis. Você pode guardar tudo na memória, brincar com ele e treinar um modelo em minutos. Mas em cenários do mundo real, como detecção de fraudes de cartão de crédito, sistemas de recomendação ou previsões financeiras, você está lidando com milhões ou até bilhões de linhas. De repente, seu laptop ou servidor não consegue lidar com isso.
Se você tentar carregar todos esses dados no PyTorch ou TensorFlow de uma vez, tudo irá falhar. Essas estruturas são projetadas para treinamento de modelos, não para lidar com grandes conjuntos de dados de maneira eficiente. É aqui que a computação distribuída se torna crucial.
Por que PyTorch e TensorFlow não são suficientes
PyTorch e TensorFlow são ótimos para construir e otimizar modelos, mas ficam aquém ao lidar com tarefas de dados em grande escala. Dois problemas principais:
É aqui que o PySpark brilha. Ele foi projetado para funcionar com dados distribuídos, processando-os de forma eficiente em várias máquinas e, ao mesmo tempo, lidando com conjuntos de dados massivos sem travar o sistema.
Exemplo do mundo real: detecção de fraude de cartão de crédito com PySpark
Vamos mergulhar em um exemplo. Suponha que você esteja trabalhando em um sistema de detecção de fraudes usando dados de transações de cartão de crédito. Neste caso, usaremos um conjunto de dados popular do Kaggle. Ele contém mais de 284.000 transações e menos de 1% delas são fraudulentas.
Etapa 1: configurar o PySpark no Google Colab
Usaremos o Google Colab para isso porque nos permite executar o PySpark com configuração mínima.
!pip install pyspark
Em seguida, importe as bibliotecas necessárias e inicie uma sessão do Spark.
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, udf from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler from pyspark.ml.classification import RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors import numpy as np from pyspark.sql.types import FloatType
iniciar uma sessão do pyspark
spark = SparkSession.builder \ .appName("FraudDetectionImproved") \ .master("local[*]") \ .config("spark.executorEnv.PYTHONHASHSEED", "0") \ .getOrCreate()
Etapa 2: carregar e preparar dados
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True) data = data.orderBy("Time") # Ensure data is sorted by time data.show(5) data.describe().show()
# Check for missing values in each column data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show() # Prepare the feature columns feature_columns = data.columns feature_columns.remove("Class") # Removing "Class" column as it is our label # Assemble features into a single vector assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") data = assembler.transform(data) data.select("features", "Class").show(5) # Split data into train (60%), test (20%), and unseen (20%) train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42) test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42) # Print class distribution in each dataset print("Train Data:") train_data.groupBy("Class").count().show() print("Test and parameter optimisation Data:") test_data.groupBy("Class").count().show() print("Unseen Data:") unseen_data.groupBy("Class").count().show()
Etapa 3: inicializar modelo
# Initialize RandomForestClassifier rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability") # Create ParamGrid for Cross Validation paramGrid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20 ]) \ .addGrid(rf.maxDepth, [5, 10]) \ .build() # Create 5-fold CrossValidator crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"), numFolds=5)
Etapa 4: ajuste, execute a validação cruzada e escolha o melhor conjunto de parâmetros
# Run cross-validation, and choose the best set of parameters rf_model = crossval.fit(train_data) # Make predictions on test data predictions_rf = rf_model.transform(test_data) # Evaluate Random Forest Model binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC") pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR") auc_rf = binary_evaluator.evaluate(predictions_rf) auprc_rf = pr_evaluator.evaluate(predictions_rf) print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}") # UDF to extract positive probability from probability vector extract_prob = udf(lambda prob: float(prob[1]), FloatType()) predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
Etapa 5 Função para calcular precisão, recall e pontuação F1
# Function to calculate precision, recall, and F1-score def calculate_metrics(predictions): tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count() fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count() fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count() precision = tp / (tp fp) if (tp fp) != 0 else 0 recall = tp / (tp fn) if (tp fn) != 0 else 0 f1_score = (2 * precision * recall) / (precision recall) if (precision recall) != 0 else 0 return precision, recall, f1_score
Etapa 6: Encontre o melhor limite para o modelo
# Find the best threshold for the model best_threshold = 0.5 best_f1 = 0 for threshold in np.arange(0.1, 0.9, 0.1): thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double")) precision, recall, f1 = calculate_metrics(thresholded_predictions) if f1 > best_f1: best_f1 = f1 best_threshold = threshold print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
Etapa 7: avaliar dados não vistos
# Evaluate on unseen data predictions_unseen = rf_model.transform(unseen_data) auc_unseen = binary_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {auc_unseen:.4f}") precision, recall, f1 = calculate_metrics(predictions_unseen) print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}") area_under_roc = binary_evaluator.evaluate(predictions_unseen) area_under_pr = pr_evaluator.evaluate(predictions_unseen) print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
RESULTADOS
Best threshold: 0.30000000000000004, Best F1-score: 0.9062 Unseen Data - AUC: 0.9384 Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485 Unseen Data - AUC: 0.9423, AUPRC: 0.8618
Você pode então salvar este modelo (alguns KBs) e usá-lo em qualquer lugar no pipeline do pyspark
rf_model.save()
Veja por que o PySpark faz uma enorme diferença ao lidar com grandes conjuntos de dados em tarefas de aprendizado de máquina do mundo real:
É facilmente escalável: o PySpark pode distribuir tarefas entre clusters, permitindo processar terabytes de dados sem ficar sem memória.
Processamento de dados dinâmico: o PySpark não precisa carregar todo o conjunto de dados na memória. Ele processa os dados conforme necessário, o que o torna muito mais eficiente.
Treinamento de modelo mais rápido: com a computação distribuída, você pode treinar modelos mais rapidamente, distribuindo a carga de trabalho computacional entre várias máquinas.
Considerações Finais
PyTorch e TensorFlow são ferramentas fantásticas para construir modelos de aprendizado de máquina, mas para tarefas de grande escala do mundo real, você precisa de mais. A computação distribuída com PySpark permite que você lide com grandes conjuntos de dados com eficiência, processe dados em tempo real e dimensione seus pipelines de aprendizado de máquina.
Portanto, da próxima vez que você trabalhar com dados massivos - seja detecção de fraudes, sistemas de recomendação ou análise financeira - considere usar o PySpark para levar seu projeto para o próximo nível.
Para obter o código completo e os resultados, confira este notebook. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
Eu sou Swapnil, fique à vontade para deixar seus comentários, resultados e ideias, ou envie-me um ping - [email protected] para trabalhos e trabalhos de desenvolvimento de dados e software
Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.
Copyright© 2022 湘ICP备2022001581号-3