Tenía la necesidad de un cliente de reducir el costo de los procesos que se ejecutaban en Databricks. Una de las características de las que era responsable Databricks era recopilar archivos de varios SFTP, descomprimirlos y colocarlos en Data Lake.
La automatización de los flujos de trabajo de datos es un componente crucial en la ingeniería de datos moderna. En este artículo, exploraremos cómo crear una función AWS Lambda utilizando GitLab CI/CD y Terraform que permite que una aplicación Go se conecte a un servidor SFTP, recopile archivos, los almacene en Amazon S3 y, finalmente, active un trabajo en Databricks. Este proceso de extremo a extremo es esencial para los sistemas que dependen de una integración y automatización de datos eficientes.
Comience creando una aplicación Go que se conectará al servidor SFTP para recopilar archivos. Utilice paquetes como github.com/pkg/sftp para establecer la conexión SFTP y github.com/aws/aws-sdk-go para interactuar con el servicio AWS S3.
package main import ( "fmt" "log" "os" "path/filepath" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func main() { // Configuração do cliente SFTP user := "seu_usuario_sftp" pass := "sua_senha_sftp" host := "endereco_sftp:22" config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ ssh.Password(pass), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } // Conectar ao servidor SFTP conn, err := ssh.Dial("tcp", host, config) if err != nil { log.Fatal(err) } client, err := sftp.NewClient(conn) if err != nil { log.Fatal(err) } defer client.Close() // Baixar arquivos do SFTP remoteFilePath := "/path/to/remote/file" localDir := "/path/to/local/dir" localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath)) dstFile, err := os.Create(localFilePath) if err != nil { log.Fatal(err) } defer dstFile.Close() srcFile, err := client.Open(remoteFilePath) if err != nil { log.Fatal(err) } defer srcFile.Close() if _, err := srcFile.WriteTo(dstFile); err != nil { log.Fatal(err) } fmt.Println("Arquivo baixado com sucesso:", localFilePath) // Configuração do cliente S3 sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("us-west-2"), })) uploader := s3manager.NewUploader(sess) // Carregar arquivo para o S3 file, err := os.Open(localFilePath) if err != nil { log.Fatal(err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String("seu-bucket-s3"), Key: aws.String(filepath.Base(localFilePath)), Body: file, }) if err != nil { log.Fatal("Falha ao carregar arquivo para o S3:", err) } fmt.Println("Arquivo carregado com sucesso no S3") }
Terraform se utilizará para aprovisionar la función Lambda y los recursos necesarios en AWS. Cree un archivo main.tf con la configuración necesaria para crear la función Lambda, las políticas de IAM y los depósitos de S3.
provider "aws" { region = "us-east-1" } resource "aws_iam_role" "lambda_execution_role" { name = "lambda_execution_role" assume_role_policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "lambda.amazonaws.com" }, }, ] }) } resource "aws_iam_policy" "lambda_policy" { name = "lambda_policy" description = "A policy that allows a lambda function to access S3 and SFTP resources" policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = [ "s3:ListBucket", "s3:GetObject", "s3:PutObject", ], Effect = "Allow", Resource = [ "arn:aws:s3:::seu-bucket-s3", "arn:aws:s3:::seu-bucket-s3/*", ], }, ] }) } resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" { role = aws_iam_role.lambda_execution_role.name policy_arn = aws_iam_policy.lambda_policy.arn } resource "aws_lambda_function" "sftp_lambda" { function_name = "sftp_lambda_function" s3_bucket = "seu-bucket-s3-com-codigo-lambda" s3_key = "sftp-lambda.zip" handler = "main" runtime = "go1.x" role = aws_iam_role.lambda_execution_role.arn environment { variables = { SFTP_HOST = "endereco_sftp", SFTP_USER = "seu_usuario_sftp", SFTP_PASSWORD = "sua_senha_sftp", S3_BUCKET = "seu-bucket-s3", } } } resource "aws_s3_bucket" "s3_bucket" { bucket = "seu-bucket-s3" acl = "private" }
En GitLab, defina la canalización de CI/CD en el archivo .gitlab-ci.yml. Este proceso debe incluir pasos para probar la aplicación Go, ejecutar Terraform para aprovisionar la infraestructura y un paso para la limpieza si es necesario.
stages: - test - build - deploy variables: S3_BUCKET: "seu-bucket-s3" AWS_DEFAULT_REGION: "us-east-1" TF_VERSION: "1.0.0" before_script: - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )' - eval $(ssh-agent -s) - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add - - mkdir -p ~/.ssh - chmod 700 ~/.ssh - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts test: stage: test image: golang:1.18 script: - go test -v ./... build: stage: build image: golang:1.18 script: - go build -o myapp - zip -r sftp-lambda.zip myapp artifacts: paths: - sftp-lambda.zip only: - master deploy: stage: deploy image: hashicorp/terraform:$TF_VERSION script: - terraform init - terraform apply -auto-approve only: - master environment: name: production
Después de cargar los archivos en S3, la función Lambda debe activar un trabajo en Databricks. Esto se puede hacer usando la API de Databricks para iniciar trabajos existentes.
package main import ( "bytes" "encoding/json" "fmt" "net/http" ) // Estrutura para a requisição de iniciar um job no Databricks type DatabricksJobRequest struct { JobID int `json:"job_id"` } // Função para acionar um job no Databricks func triggerDatabricksJob(databricksInstance string, token string, jobID int) error { url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance) requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID}) req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode) } return nil } func main() { // ... (código existente para conectar ao SFTP e carregar no S3) // Substitua pelos seus valores reais databricksInstance := "your-databricks-instance" databricksToken := "your-databricks-token" databricksJobID := 123 // ID do job que você deseja acionar // Acionar o job no Databricks após o upload para o S3 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID) if err != nil { log.Fatal("Erro ao acionar o job do Databricks:", err) } fmt.Println("Job do Databricks acionado com sucesso") }
Envíe el código al repositorio de GitLab para que se ejecute la canalización. Verifique que todos los pasos se completen correctamente y que la función Lambda esté operativa e interactuando correctamente con S3 y Databricks.
Una vez que tenga el código completo y el archivo .gitlab-ci.yml configurado, puede ejecutar la canalización siguiendo estos pasos:
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master ´´´
Recuerde que necesitará configurar variables de entorno en GitLab CI/CD para almacenar información confidencial como tokens de acceso y claves privadas. Esto se puede hacer en la sección 'Configuración' > 'CI/CD' > 'Variables' de tu proyecto GitLab.
Además, asegúrese de que el token de Databricks tenga los permisos necesarios para activar trabajos y que el trabajo exista con el ID proporcionado.
La automatización de las tareas de ingeniería de datos se puede simplificar significativamente utilizando herramientas como GitLab CI/CD, Terraform y AWS Lambda. Si sigue los pasos descritos en este artículo, puede crear un sistema sólido que automatice la recopilación de datos y la integración entre SFTP, S3 y Databricks, todo con la eficiencia y simplicidad de Go. Con este enfoque, estará bien equipado para abordar los problemas. desafíos de la integración de datos a escala.
Mis contactos:
LinkedIn - Airton Lira Junior
iMasters - Airton Lira Junior
Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.
Copyright© 2022 湘ICP备2022001581号-3