मुझे डेटाब्रिक्स पर चलने वाली प्रक्रियाओं की लागत को कम करने के लिए एक क्लाइंट की आवश्यकता थी। डेटाब्रिक्स जिन सुविधाओं के लिए ज़िम्मेदार था उनमें से एक थी विभिन्न एसएफटीपी से फ़ाइलें एकत्र करना, उन्हें डीकंप्रेस करना और उन्हें डेटा लेक में रखना।
आधुनिक डेटा इंजीनियरिंग में डेटा वर्कफ़्लो को स्वचालित करना एक महत्वपूर्ण घटक है। इस लेख में, हम GitLab CI/CD और टेराफॉर्म का उपयोग करके AWS लैम्ब्डा फ़ंक्शन बनाने का तरीका जानेंगे जो एक Go एप्लिकेशन को SFTP सर्वर से कनेक्ट करने, फ़ाइलें एकत्र करने, उन्हें Amazon S3 में संग्रहीत करने और अंत में डेटाब्रिक्स पर एक कार्य ट्रिगर करने की अनुमति देता है। यह एंड-टू-एंड प्रक्रिया उन प्रणालियों के लिए आवश्यक है जो कुशल डेटा एकीकरण और स्वचालन पर निर्भर हैं।
एक गो एप्लिकेशन बनाकर शुरुआत करें जो फ़ाइलें एकत्र करने के लिए एसएफटीपी सर्वर से कनेक्ट होगी। SFTP कनेक्शन स्थापित करने के लिए github.com/pkg/sftp और AWS S3 सेवा के साथ इंटरैक्ट करने के लिए github.com/aws/aws-sdk-go जैसे पैकेज का उपयोग करें।
package main import ( "fmt" "log" "os" "path/filepath" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func main() { // Configuração do cliente SFTP user := "seu_usuario_sftp" pass := "sua_senha_sftp" host := "endereco_sftp:22" config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ ssh.Password(pass), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } // Conectar ao servidor SFTP conn, err := ssh.Dial("tcp", host, config) if err != nil { log.Fatal(err) } client, err := sftp.NewClient(conn) if err != nil { log.Fatal(err) } defer client.Close() // Baixar arquivos do SFTP remoteFilePath := "/path/to/remote/file" localDir := "/path/to/local/dir" localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath)) dstFile, err := os.Create(localFilePath) if err != nil { log.Fatal(err) } defer dstFile.Close() srcFile, err := client.Open(remoteFilePath) if err != nil { log.Fatal(err) } defer srcFile.Close() if _, err := srcFile.WriteTo(dstFile); err != nil { log.Fatal(err) } fmt.Println("Arquivo baixado com sucesso:", localFilePath) // Configuração do cliente S3 sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("us-west-2"), })) uploader := s3manager.NewUploader(sess) // Carregar arquivo para o S3 file, err := os.Open(localFilePath) if err != nil { log.Fatal(err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String("seu-bucket-s3"), Key: aws.String(filepath.Base(localFilePath)), Body: file, }) if err != nil { log.Fatal("Falha ao carregar arquivo para o S3:", err) } fmt.Println("Arquivo carregado com sucesso no S3") }
एडब्ल्यूएस पर लैम्ब्डा फ़ंक्शन और आवश्यक संसाधनों का प्रावधान करने के लिए टेराफॉर्म का उपयोग किया जाएगा। लैम्ब्डा फ़ंक्शन, IAM नीतियों और S3 बकेट बनाने के लिए आवश्यक कॉन्फ़िगरेशन के साथ एक main.tf फ़ाइल बनाएं।
provider "aws" { region = "us-east-1" } resource "aws_iam_role" "lambda_execution_role" { name = "lambda_execution_role" assume_role_policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "lambda.amazonaws.com" }, }, ] }) } resource "aws_iam_policy" "lambda_policy" { name = "lambda_policy" description = "A policy that allows a lambda function to access S3 and SFTP resources" policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = [ "s3:ListBucket", "s3:GetObject", "s3:PutObject", ], Effect = "Allow", Resource = [ "arn:aws:s3:::seu-bucket-s3", "arn:aws:s3:::seu-bucket-s3/*", ], }, ] }) } resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" { role = aws_iam_role.lambda_execution_role.name policy_arn = aws_iam_policy.lambda_policy.arn } resource "aws_lambda_function" "sftp_lambda" { function_name = "sftp_lambda_function" s3_bucket = "seu-bucket-s3-com-codigo-lambda" s3_key = "sftp-lambda.zip" handler = "main" runtime = "go1.x" role = aws_iam_role.lambda_execution_role.arn environment { variables = { SFTP_HOST = "endereco_sftp", SFTP_USER = "seu_usuario_sftp", SFTP_PASSWORD = "sua_senha_sftp", S3_BUCKET = "seu-bucket-s3", } } } resource "aws_s3_bucket" "s3_bucket" { bucket = "seu-bucket-s3" acl = "private" }
GitLab में, .gitlab-ci.yml फ़ाइल में CI/CD पाइपलाइन को परिभाषित करें। इस पाइपलाइन में गो एप्लिकेशन का परीक्षण करने, बुनियादी ढांचे का प्रावधान करने के लिए टेराफॉर्म चलाने और यदि आवश्यक हो तो सफाई के लिए एक कदम शामिल होना चाहिए।
stages: - test - build - deploy variables: S3_BUCKET: "seu-bucket-s3" AWS_DEFAULT_REGION: "us-east-1" TF_VERSION: "1.0.0" before_script: - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )' - eval $(ssh-agent -s) - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add - - mkdir -p ~/.ssh - chmod 700 ~/.ssh - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts test: stage: test image: golang:1.18 script: - go test -v ./... build: stage: build image: golang:1.18 script: - go build -o myapp - zip -r sftp-lambda.zip myapp artifacts: paths: - sftp-lambda.zip only: - master deploy: stage: deploy image: hashicorp/terraform:$TF_VERSION script: - terraform init - terraform apply -auto-approve only: - master environment: name: production
फ़ाइलों को S3 पर अपलोड करने के बाद, लैम्ब्डा फ़ंक्शन को डेटाब्रिक्स में एक कार्य को ट्रिगर करना होगा। यह मौजूदा नौकरियों को लॉन्च करने के लिए डेटाब्रिक्स एपीआई का उपयोग करके किया जा सकता है।
package main import ( "bytes" "encoding/json" "fmt" "net/http" ) // Estrutura para a requisição de iniciar um job no Databricks type DatabricksJobRequest struct { JobID int `json:"job_id"` } // Função para acionar um job no Databricks func triggerDatabricksJob(databricksInstance string, token string, jobID int) error { url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance) requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID}) req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode) } return nil } func main() { // ... (código existente para conectar ao SFTP e carregar no S3) // Substitua pelos seus valores reais databricksInstance := "your-databricks-instance" databricksToken := "your-databricks-token" databricksJobID := 123 // ID do job que você deseja acionar // Acionar o job no Databricks após o upload para o S3 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID) if err != nil { log.Fatal("Erro ao acionar o job do Databricks:", err) } fmt.Println("Job do Databricks acionado com sucesso") }
पाइपलाइन चलाने के लिए कोड को GitLab रिपॉजिटरी में पुश करें। सत्यापित करें कि सभी चरण सफलतापूर्वक पूरे हो गए हैं और लैम्ब्डा फ़ंक्शन चालू है और S3 और डेटाब्रिक्स के साथ सही ढंग से इंटरैक्ट कर रहा है।
एक बार जब आपके पास पूरा कोड और .gitlab-ci.yml फ़ाइल कॉन्फ़िगर हो जाए, तो आप इन चरणों का पालन करके पाइपलाइन चला सकते हैं:
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master
git add . git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks" git push origin master ´´´
याद रखें कि एक्सेस टोकन और निजी कुंजी जैसी संवेदनशील जानकारी संग्रहीत करने के लिए आपको GitLab CI/CD में पर्यावरण चर को कॉन्फ़िगर करने की आवश्यकता होगी। यह आपके GitLab प्रोजेक्ट के 'सेटिंग्स' > 'CI/CD' > 'वेरिएबल्स' अनुभाग में किया जा सकता है।
इसके अलावा, सुनिश्चित करें कि डेटाब्रिक्स टोकन के पास नौकरियों को ट्रिगर करने के लिए आवश्यक अनुमतियां हैं और नौकरी प्रदान की गई आईडी के साथ मौजूद है।
GitLab CI/CD, टेराफॉर्म और AWS लैम्ब्डा जैसे टूल का उपयोग करके डेटा इंजीनियरिंग कार्यों के स्वचालन को काफी सरल बनाया जा सकता है। इस आलेख में उल्लिखित चरणों का पालन करके, आप एक मजबूत प्रणाली बना सकते हैं जो गो की दक्षता और सरलता के साथ एसएफटीपी, एस 3 और डेटाब्रिक्स के बीच डेटा संग्रह और एकीकरण को स्वचालित करती है, इस दृष्टिकोण के साथ, आप इसे संबोधित करने के लिए अच्छी तरह से सुसज्जित होंगे बड़े पैमाने पर डेटा एकीकरण की चुनौतियाँ।
मेरे संपर्क:
लिंक्डइन - एयरटन लीरा जूनियर
आईमास्टर्स - एयरटन लीरा जूनियर
अस्वीकरण: उपलब्ध कराए गए सभी संसाधन आंशिक रूप से इंटरनेट से हैं। यदि आपके कॉपीराइट या अन्य अधिकारों और हितों का कोई उल्लंघन होता है, तो कृपया विस्तृत कारण बताएं और कॉपीराइट या अधिकारों और हितों का प्रमाण प्रदान करें और फिर इसे ईमेल पर भेजें: [email protected] हम इसे आपके लिए यथाशीघ्र संभालेंगे।
Copyright© 2022 湘ICP备2022001581号-3