كانت لدي حاجة لدى العميل لتقليل تكلفة العمليات التي يتم تشغيلها على Databricks. إحدى الميزات التي كانت Databricks مسؤولة عنها هي جمع الملفات من SFTP المختلفة وفك ضغطها ووضعها في Data Lake.
تعد أتمتة سير عمل البيانات عنصرًا حاسمًا في هندسة البيانات الحديثة. في هذه المقالة، سنستكشف كيفية إنشاء وظيفة AWS Lambda باستخدام GitLab CI/CD وTerraform التي تسمح لتطبيق Go بالاتصال بخادم SFTP، وجمع الملفات، وتخزينها في Amazon S3، وأخيرًا تشغيل مهمة على Databricks. تعد هذه العملية الشاملة ضرورية للأنظمة التي تعتمد على تكامل البيانات والأتمتة بكفاءة.
ابدأ بإنشاء تطبيق Go الذي سيتصل بخادم SFTP لجمع الملفات. استخدم حزمًا مثل github.com/pkg/sftp لإنشاء اتصال SFTP وgithub.com/aws/aws-sdk-go للتفاعل مع خدمة AWS S3.
package main import ( "fmt" "log" "os" "path/filepath" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func main() { // Configuração do cliente SFTP user := "seu_usuario_sftp" pass := "sua_senha_sftp" host := "endereco_sftp:22" config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ ssh.Password(pass), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } // Conectar ao servidor SFTP conn, err := ssh.Dial("tcp", host, config) if err != nil { log.Fatal(err) } client, err := sftp.NewClient(conn) if err != nil { log.Fatal(err) } defer client.Close() // Baixar arquivos do SFTP remoteFilePath := "/path/to/remote/file" localDir := "/path/to/local/dir" localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath)) dstFile, err := os.Create(localFilePath) if err != nil { log.Fatal(err) } defer dstFile.Close() srcFile, err := client.Open(remoteFilePath) if err != nil { log.Fatal(err) } defer srcFile.Close() if _, err := srcFile.WriteTo(dstFile); err != nil { log.Fatal(err) } fmt.Println("Arquivo baixado com sucesso:", localFilePath) // Configuração do cliente S3 sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("us-west-2"), })) uploader := s3manager.NewUploader(sess) // Carregar arquivo para o S3 file, err := os.Open(localFilePath) if err != nil { log.Fatal(err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String("seu-bucket-s3"), Key: aws.String(filepath.Base(localFilePath)), Body: file, }) if err != nil { log.Fatal("Falha ao carregar arquivo para o S3:", err) } fmt.Println("Arquivo carregado com sucesso no S3") }
سيتم استخدام Terraform لتوفير وظيفة Lambda والموارد المطلوبة على AWS. أنشئ ملف main.tf بالتكوين المطلوب لإنشاء وظيفة Lambda وسياسات IAM ومستودعات S3.
provider "aws" { region = "us-east-1" } resource "aws_iam_role" "lambda_execution_role" { name = "lambda_execution_role" assume_role_policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "lambda.amazonaws.com" }, }, ] }) } resource "aws_iam_policy" "lambda_policy" { name = "lambda_policy" description = "A policy that allows a lambda function to access S3 and SFTP resources" policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = [ "s3:ListBucket", "s3:GetObject", "s3:PutObject", ], Effect = "Allow", Resource = [ "arn:aws:s3:::seu-bucket-s3", "arn:aws:s3:::seu-bucket-s3/*", ], }, ] }) } resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" { role = aws_iam_role.lambda_execution_role.name policy_arn = aws_iam_policy.lambda_policy.arn } resource "aws_lambda_function" "sftp_lambda" { function_name = "sftp_lambda_function" s3_bucket = "seu-bucket-s3-com-codigo-lambda" s3_key = "sftp-lambda.zip" handler = "main" runtime = "go1.x" role = aws_iam_role.lambda_execution_role.arn environment { variables = { SFTP_HOST = "endereco_sftp", SFTP_USER = "seu_usuario_sftp", SFTP_PASSWORD = "sua_senha_sftp", S3_BUCKET = "seu-bucket-s3", } } } resource "aws_s3_bucket" "s3_bucket" { bucket = "seu-bucket-s3" acl = "private" }
في GitLab، حدد مسار CI/CD في ملف .gitlab-ci.yml. يجب أن يتضمن هذا المسار خطوات لاختبار تطبيق Go، وتشغيل Terraform لتوفير البنية التحتية، وخطوة للتنظيف إذا لزم الأمر.
stages: - test - build - deploy variables: S3_BUCKET: "seu-bucket-s3" AWS_DEFAULT_REGION: "us-east-1" TF_VERSION: "1.0.0" before_script: - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )' - eval $(ssh-agent -s) - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add - - mkdir -p ~/.ssh - chmod 700 ~/.ssh - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts test: stage: test image: golang:1.18 script: - go test -v ./... build: stage: build image: golang:1.18 script: - go build -o myapp - zip -r sftp-lambda.zip myapp artifacts: paths: - sftp-lambda.zip only: - master deploy: stage: deploy image: hashicorp/terraform:$TF_VERSION script: - terraform init - terraform apply -auto-approve only: - master environment: name: production
بعد تحميل الملفات إلى S3، يجب أن تقوم وظيفة Lambda بتشغيل مهمة في Databricks. ويمكن القيام بذلك باستخدام Databricks API لبدء الوظائف الحالية.
package main import ( "bytes" "encoding/json" "fmt" "net/http" ) // Estrutura para a requisição de iniciar um job no Databricks type DatabricksJobRequest struct { JobID int `json:"job_id"` } // Função para acionar um job no Databricks func triggerDatabricksJob(databricksInstance string, token string, jobID int) error { url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance) requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID}) req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode) } return nil } func main() { // ... (código existente para conectar ao SFTP e carregar no S3) // Substitua pelos seus valores reais databricksInstance := "your-databricks-instance" databricksToken := "your-databricks-token" databricksJobID := 123 // ID do job que você deseja acionar // Acionar o job no Databricks após o upload para o S3 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID) if err != nil { log.Fatal("Erro ao acionar o job do Databricks:", err) } fmt.Println("Job do Databricks acionado com sucesso") }
ادفع الكود إلى مستودع GitLab لتشغيل خط الأنابيب. تأكد من اكتمال جميع الخطوات بنجاح وأن وظيفة Lambda تعمل وتتفاعل بشكل صحيح مع S3 وDatabricks.
بمجرد حصولك على الكود الكامل وتكوين ملف .gitlab-ci.yml، يمكنك تشغيل المسار باتباع الخطوات التالية:
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master ´´´
تذكر أنك ستحتاج إلى تكوين متغيرات البيئة في GitLab CI/CD لتخزين المعلومات الحساسة مثل رموز الوصول والمفاتيح الخاصة. يمكن القيام بذلك في قسم "الإعدادات" > "CI/CD" > "المتغيرات" في مشروع GitLab الخاص بك.
تأكد أيضًا من أن رمز Databricks لديه الأذونات اللازمة لتشغيل المهام وأن المهمة موجودة بالمعرف المقدم.
يمكن تبسيط أتمتة مهام هندسة البيانات بشكل كبير باستخدام أدوات مثل GitLab CI/CD وTerraform وAWS Lambda. باتباع الخطوات الموضحة في هذه المقالة، يمكنك إنشاء نظام قوي يعمل على أتمتة جمع البيانات والتكامل بين SFTP وS3 وDatabricks، وكل ذلك بكفاءة وبساطة Go تحديات تكامل البيانات على نطاق واسع.
جهات الاتصال الخاصة بي:
لينكد إن - إيرتون ليرا جونيور
iMasters - إيرتون ليرا جونيور
تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.
Copyright© 2022 湘ICP备2022001581号-3