J'avais besoin chez un client de réduire le coût des processus exécutés sur Databricks. L'une des fonctionnalités dont Databricks était responsable était de collecter des fichiers de divers SFTP, de les décompresser et de les placer dans le Data Lake.
L'automatisation des flux de travail de données est un élément crucial de l'ingénierie des données moderne. Dans cet article, nous allons explorer comment créer une fonction AWS Lambda à l'aide de GitLab CI/CD et Terraform qui permet à une application Go de se connecter à un serveur SFTP, de collecter des fichiers, de les stocker dans Amazon S3 et enfin de déclencher une tâche sur Databricks. Ce processus de bout en bout est essentiel pour les systèmes qui reposent sur une intégration et une automatisation efficaces des données.
Commencez par créer une application Go qui se connectera au serveur SFTP pour collecter les fichiers. Utilisez des packages tels que github.com/pkg/sftp pour établir la connexion SFTP et github.com/aws/aws-sdk-go pour interagir avec le service AWS S3.
package main import ( "fmt" "log" "os" "path/filepath" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func main() { // Configuração do cliente SFTP user := "seu_usuario_sftp" pass := "sua_senha_sftp" host := "endereco_sftp:22" config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ ssh.Password(pass), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } // Conectar ao servidor SFTP conn, err := ssh.Dial("tcp", host, config) if err != nil { log.Fatal(err) } client, err := sftp.NewClient(conn) if err != nil { log.Fatal(err) } defer client.Close() // Baixar arquivos do SFTP remoteFilePath := "/path/to/remote/file" localDir := "/path/to/local/dir" localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath)) dstFile, err := os.Create(localFilePath) if err != nil { log.Fatal(err) } defer dstFile.Close() srcFile, err := client.Open(remoteFilePath) if err != nil { log.Fatal(err) } defer srcFile.Close() if _, err := srcFile.WriteTo(dstFile); err != nil { log.Fatal(err) } fmt.Println("Arquivo baixado com sucesso:", localFilePath) // Configuração do cliente S3 sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("us-west-2"), })) uploader := s3manager.NewUploader(sess) // Carregar arquivo para o S3 file, err := os.Open(localFilePath) if err != nil { log.Fatal(err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String("seu-bucket-s3"), Key: aws.String(filepath.Base(localFilePath)), Body: file, }) if err != nil { log.Fatal("Falha ao carregar arquivo para o S3:", err) } fmt.Println("Arquivo carregado com sucesso no S3") }
Terraform sera utilisé pour provisionner la fonction Lambda et les ressources requises sur AWS. Créez un fichier main.tf avec la configuration requise pour créer la fonction Lambda, les stratégies IAM et les compartiments S3.
provider "aws" { region = "us-east-1" } resource "aws_iam_role" "lambda_execution_role" { name = "lambda_execution_role" assume_role_policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "lambda.amazonaws.com" }, }, ] }) } resource "aws_iam_policy" "lambda_policy" { name = "lambda_policy" description = "A policy that allows a lambda function to access S3 and SFTP resources" policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = [ "s3:ListBucket", "s3:GetObject", "s3:PutObject", ], Effect = "Allow", Resource = [ "arn:aws:s3:::seu-bucket-s3", "arn:aws:s3:::seu-bucket-s3/*", ], }, ] }) } resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" { role = aws_iam_role.lambda_execution_role.name policy_arn = aws_iam_policy.lambda_policy.arn } resource "aws_lambda_function" "sftp_lambda" { function_name = "sftp_lambda_function" s3_bucket = "seu-bucket-s3-com-codigo-lambda" s3_key = "sftp-lambda.zip" handler = "main" runtime = "go1.x" role = aws_iam_role.lambda_execution_role.arn environment { variables = { SFTP_HOST = "endereco_sftp", SFTP_USER = "seu_usuario_sftp", SFTP_PASSWORD = "sua_senha_sftp", S3_BUCKET = "seu-bucket-s3", } } } resource "aws_s3_bucket" "s3_bucket" { bucket = "seu-bucket-s3" acl = "private" }
Dans GitLab, définissez le pipeline CI/CD dans le fichier .gitlab-ci.yml. Ce pipeline doit inclure des étapes pour tester l'application Go, exécuter Terraform pour provisionner l'infrastructure et une étape de nettoyage si nécessaire.
stages: - test - build - deploy variables: S3_BUCKET: "seu-bucket-s3" AWS_DEFAULT_REGION: "us-east-1" TF_VERSION: "1.0.0" before_script: - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )' - eval $(ssh-agent -s) - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add - - mkdir -p ~/.ssh - chmod 700 ~/.ssh - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts test: stage: test image: golang:1.18 script: - go test -v ./... build: stage: build image: golang:1.18 script: - go build -o myapp - zip -r sftp-lambda.zip myapp artifacts: paths: - sftp-lambda.zip only: - master deploy: stage: deploy image: hashicorp/terraform:$TF_VERSION script: - terraform init - terraform apply -auto-approve only: - master environment: name: production
Après avoir téléchargé les fichiers sur S3, la fonction Lambda doit déclencher une tâche dans Databricks. Cela peut être fait à l'aide de l'API Databricks pour lancer des tâches existantes.
package main import ( "bytes" "encoding/json" "fmt" "net/http" ) // Estrutura para a requisição de iniciar um job no Databricks type DatabricksJobRequest struct { JobID int `json:"job_id"` } // Função para acionar um job no Databricks func triggerDatabricksJob(databricksInstance string, token string, jobID int) error { url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance) requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID}) req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode) } return nil } func main() { // ... (código existente para conectar ao SFTP e carregar no S3) // Substitua pelos seus valores reais databricksInstance := "your-databricks-instance" databricksToken := "your-databricks-token" databricksJobID := 123 // ID do job que você deseja acionar // Acionar o job no Databricks após o upload para o S3 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID) if err != nil { log.Fatal("Erro ao acionar o job do Databricks:", err) } fmt.Println("Job do Databricks acionado com sucesso") }
Poussez le code vers le référentiel GitLab pour que le pipeline s'exécute. Vérifiez que toutes les étapes sont terminées avec succès et que la fonction Lambda est opérationnelle et interagit correctement avec S3 et Databricks.
Une fois que vous avez configuré le code complet et le fichier .gitlab-ci.yml, vous pouvez exécuter le pipeline en suivant ces étapes :
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master ´´´
N'oubliez pas que vous devrez configurer des variables d'environnement dans GitLab CI/CD pour stocker des informations sensibles telles que les jetons d'accès et les clés privées. Cela peut être fait dans la section « Paramètres » > « CI/CD » > « Variables » de votre projet GitLab.
Assurez-vous également que le jeton Databricks dispose des autorisations nécessaires pour déclencher des tâches et que la tâche existe avec l'ID fourni.
L'automatisation des tâches d'ingénierie des données peut être considérablement simplifiée à l'aide d'outils tels que GitLab CI/CD, Terraform et AWS Lambda. En suivant les étapes décrites dans cet article, vous pouvez créer un système robuste qui automatise la collecte de données et l'intégration entre SFTP, S3 et Databricks, le tout avec l'efficacité et la simplicité de Go. Avec cette approche, vous serez bien équipé pour aborder le problème. défis de l'intégration des données à grande échelle.
Mes contacts :
LinkedIn - Airton Lira Junior
iMasters - Airton Lira Junior
Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.
Copyright© 2022 湘ICP备2022001581号-3