」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 使用 GitLab CI/CD 和 Terraform 實作 Lambda 以進行 SFTP 整合、Go 中的 S Databricks

使用 GitLab CI/CD 和 Terraform 實作 Lambda 以進行 SFTP 整合、Go 中的 S Databricks

發佈於2024-11-08
瀏覽:664

Implementando uma Lambda com GitLab CI/CD e Terraform para Integração SFTP, S Databricks em Go

通过 Databricks 中的流程自动化降低成本

我的客户需要降低在 Databricks 上运行的流程的成本。 Databricks 负责的功能之一是从各种 SFTP 收集文件,解压缩它们并将它们放入数据湖中。

自动化数据工作流程是现代数据工程的重要组成部分。在本文中,我们将探讨如何使用 GitLab CI/CD 和 Terraform 创建 AWS Lambda 函数,该函数允许 Go 应用程序连接到 SFTP 服务器、收集文件、将其存储在 Amazon S3 中,并最终在 Databricks 上触发作业。这种端到端的流程对于依赖高效数据集成和自动化的系统至关重要。

阅读本文需要什么

  • 具有项目存储库的 GitLab 帐户。
  • 有权创建 Lambda、S3 和 IAM 资源的 AWS 账户。
  • 具有创建和运行作业权限的 Databricks 帐户。
  • Go、Terraform 和 GitLab CI/CD 的基础知识。

第 1 步:准备 Go 应用程序

首先创建一个 Go 应用程序,该应用程序将连接到 SFTP 服务器以收集文件。使用 github.com/pkg/sftp 等软件包建立 SFTP 连接,并使用 github.com/aws/aws-sdk-go 与 AWS S3 服务交互。

package main

import (
 "fmt"
 "log"
 "os"
 "path/filepath"

 "github.com/pkg/sftp"
 "golang.org/x/crypto/ssh"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func main() {
 // Configuração do cliente SFTP
 user := "seu_usuario_sftp"
 pass := "sua_senha_sftp"
 host := "endereco_sftp:22"
 config := &ssh.ClientConfig{
  User: user,
  Auth: []ssh.AuthMethod{
   ssh.Password(pass),
  },
  HostKeyCallback: ssh.InsecureIgnoreHostKey(),
 }

 // Conectar ao servidor SFTP
 conn, err := ssh.Dial("tcp", host, config)
 if err != nil {
  log.Fatal(err)
 }
 client, err := sftp.NewClient(conn)
 if err != nil {
  log.Fatal(err)
 }
 defer client.Close()

 // Baixar arquivos do SFTP
 remoteFilePath := "/path/to/remote/file"
 localDir := "/path/to/local/dir"
 localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath))
 dstFile, err := os.Create(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer dstFile.Close()

 srcFile, err := client.Open(remoteFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer srcFile.Close()

 if _, err := srcFile.WriteTo(dstFile); err != nil {
  log.Fatal(err)
 }

 fmt.Println("Arquivo baixado com sucesso:", localFilePath)

 // Configuração do cliente S3
 sess := session.Must(session.NewSession(&aws.Config{
  Region: aws.String("us-west-2"),
 }))
 uploader := s3manager.NewUploader(sess)

 // Carregar arquivo para o S3
 file, err := os.Open(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer file.Close()

 _, err = uploader.Upload(&s3manager.UploadInput{
  Bucket: aws.String("seu-bucket-s3"),
  Key:    aws.String(filepath.Base(localFilePath)),
  Body:   file,
 })
 if err != nil {
  log.Fatal("Falha ao carregar arquivo para o S3:", err)
 }

 fmt.Println("Arquivo carregado com sucesso no S3")
}

步骤 2:配置 Terraform

Terraform 将用于在 AWS 上配置 Lambda 函数和所需资源。使用创建 Lambda 函数、IAM 策略和 S3 存储桶所需的配置创建 main.tf 文件。

provider "aws" {
  region = "us-east-1"
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        },
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "A policy that allows a lambda function to access S3 and SFTP resources"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "s3:ListBucket",
          "s3:GetObject",
          "s3:PutObject",
        ],
        Effect = "Allow",
        Resource = [
          "arn:aws:s3:::seu-bucket-s3",
          "arn:aws:s3:::seu-bucket-s3/*",
        ],
      },
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sftp_lambda" {
  function_name = "sftp_lambda_function"

  s3_bucket = "seu-bucket-s3-com-codigo-lambda"
  s3_key    = "sftp-lambda.zip"

  handler = "main"
  runtime = "go1.x"

  role = aws_iam_role.lambda_execution_role.arn

  environment {
    variables = {
      SFTP_HOST     = "endereco_sftp",
      SFTP_USER     = "seu_usuario_sftp",
      SFTP_PASSWORD = "sua_senha_sftp",
      S3_BUCKET     = "seu-bucket-s3",
    }
  }
}

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "seu-bucket-s3"
  acl    = "private"
}

步骤 3:配置 GitLab CI/CD

在 GitLab 中,在 .gitlab-ci.yml 文件中定义 CI/CD 管道。该管道应包括测试 Go 应用程序、运行 Terraform 来配置基础设施的步骤,以及必要时的清理步骤。

stages:
  - test
  - build
  - deploy

variables:
  S3_BUCKET: "seu-bucket-s3"
  AWS_DEFAULT_REGION: "us-east-1"
  TF_VERSION: "1.0.0"

before_script:
  - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
  - eval $(ssh-agent -s)
  - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add -
  - mkdir -p ~/.ssh
  - chmod 700 ~/.ssh
  - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts

test:
  stage: test
  image: golang:1.18
  script:
    - go test -v ./...

build:
  stage: build
  image: golang:1.18
  script:
    - go build -o myapp
    - zip -r sftp-lambda.zip myapp
  artifacts:
    paths:
      - sftp-lambda.zip
  only:
    - master

deploy:
  stage: deploy
  image: hashicorp/terraform:$TF_VERSION
  script:
    - terraform init
    - terraform apply -auto-approve
  only:
    - master
  environment:
    name: production

第 4 步:与 Databricks 集成

将文件上传到 S3 后,Lambda 函数必须触发 Databricks 中的作业。这可以使用 Databricks API 启动现有作业来完成。

package main

import (
 "bytes"
 "encoding/json"
 "fmt"
 "net/http"
)

// Estrutura para a requisição de iniciar um job no Databricks
type DatabricksJobRequest struct {
 JobID int `json:"job_id"`
}

// Função para acionar um job no Databricks
func triggerDatabricksJob(databricksInstance string, token string, jobID int) error {
 url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance)
 requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID})
 req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
 if err != nil {
  return err
 }

 req.Header.Set("Content-Type", "application/json")
 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

 client := &http.Client{}
 resp, err := client.Do(req)
 if err != nil {
  return err
 }
 defer resp.Body.Close()

 if resp.StatusCode != http.StatusOK {
  return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode)
 }

 return nil
}

func main() {
 // ... (código existente para conectar ao SFTP e carregar no S3)

 // Substitua pelos seus valores reais
 databricksInstance := "your-databricks-instance"
 databricksToken := "your-databricks-token"
 databricksJobID := 123 // ID do job que você deseja acionar

 // Acionar o job no Databricks após o upload para o S3
 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID)
 if err != nil {
  log.Fatal("Erro ao acionar o job do Databricks:", err)
 }

 fmt.Println("Job do Databricks acionado com sucesso")
}

第 5 步:运行管道

将代码推送到 GitLab 存储库以便管道运行。验证所有步骤是否已成功完成,Lambda 函数是否可运行并与 S3 和 Databricks 正确交互。

一旦您拥有完整的代码并配置了 .gitlab-ci.yml 文件,您就可以按照以下步骤运行管道:

  • 将您的代码推送到 GitLab 存储库:
  git add .
  git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
  git push origin master
git add .
git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
git push origin master
´´´

  • GitLab CI/CD 将检测新的提交并自动启动管道。
  • 通过访问存储库的 CI/CD 部分来跟踪 GitLab 中管道的执行情况。
  • 如果所有阶段都成功,您的 Lambda 函数将被部署并可供使用。

请记住,您需要在 GitLab CI/CD 中配置环境变量来存储敏感信息,例如访问令牌和私钥。这可以在 GitLab 项目的“设置”>“CI/CD”>“变量”部分中完成。

此外,请确保 Databricks 令牌具有触发作业所需的权限,并且该作业具有提供的 ID。

结论

使用 GitLab CI/CD、Terraform 和 AWS Lambda 等工具可以显着简化数据工程任务的自动化。通过遵循本文中概述的步骤,您可以创建一个强大的系统,自动执行 SFTP、S3 和 Databricks 之间的数据收集和集成,所有这些都具有 Go 的效率和简单性。通过这种方法,您将有能力解决以下问题。大规模数据集成的挑战。

我的联系人:

LinkedIn - Airton Lira Junior

iMasters - Airton Lira Junior

aws #lambda #terraform #gitlab #ci_cd #go #databricks #dataengineering #automation


版本聲明 本文轉載於:https://dev.to/airton_lirajunior_2ddebd/implementando-uma-lambda-com-gitlab-cicd-e-terraform-para-integracao-sftp-s3-e-databricks-em-go-5hc0?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 如何使用 pygame.draw.rect 在 Pygame 中繪製矩形?
    如何使用 pygame.draw.rect 在 Pygame 中繪製矩形?
    用Pygame繪製矩形在Python的Pygame庫中,創建矩形是開發遊戲的基本任務。 對於像 3.2 這樣的 Pygame 版本,繪製矩形涉及使用 pygame.draw.rect 函數。實現此目的的方法如下:導入 Pygame 和基本常數:import pygame, sys from pyga...
    程式設計 發佈於2024-11-08
  • 如何將 CSS 樣式專門套用至 Internet Explorer 7、8 和 9?
    如何將 CSS 樣式專門套用至 Internet Explorer 7、8 和 9?
    僅將CSS 應用到Internet Explorer僅將CSS 應用到Internet Explorer將CSS 的應用限製到特定版本的Internet Explorer,如7、8 和9、利用Microsoft 特定的媒體查詢提供了一個有效的解決方案。 @media screen and (-ms-...
    程式設計 發佈於2024-11-08
  • 為什麼我的 jqGrid 編輯表單中的狀態選擇框選項值不正確?
    為什麼我的 jqGrid 編輯表單中的狀態選擇框選項值不正確?
    jqgrid 在編輯框中選擇下拉選項值不正確儘管在編輯表單中正確選擇了“國家/地區”和“州”值,但“州」選擇框中顯示的選項值編輯表格時不正確。即使在選擇美國國家/地區後切換回英國國家/地區,此問題仍然存在。 如何解決「州」選擇框中不正確的選項值要解決此問題,需要正確載入編輯表單時,根據所選國家/地區...
    程式設計 發佈於2024-11-08
  • 如何在Java中使用堆疊將算術表達式解析為樹結構?
    如何在Java中使用堆疊將算術表達式解析為樹結構?
    在Java 中將算術表達式解析為樹結構從算術表達式創建自定義樹可能是一項具有挑戰性的任務,特別是在確保樹結構時準確反映表達式的操作和優先順序。 要實現這一點,一種有效的方法是使用堆疊。以下是該過程的逐步描述:初始化:從空堆疊開始。 處理令牌:迭代表達式中的每個標記:如果標記是左括號,則將其壓入堆疊。...
    程式設計 發佈於2024-11-08
  • 考慮到特定於平台的結尾,如何有效地從 Java 中的檔案中刪除換行符?
    考慮到特定於平台的結尾,如何有效地從 Java 中的檔案中刪除換行符?
    從Java 中的文件中刪除換行符在Java 中,從文件中刪除換行符可以透過替換所有出現的換行符來實現。但是,考慮特定於平台的行結尾以確保 Windows 和 Linux 之間的兼容性至關重要。 要有效替換換行符,應使用以下步驟:以字串形式讀取檔案: 使用readFileAsString 等函數將文字...
    程式設計 發佈於2024-11-08
  • 如何從內容中刪除 HTML 特殊字元?
    如何從內容中刪除 HTML 特殊字元?
    刪除 HTML 特殊字元在嘗試產生 RSS 原始檔時,您使用了 strip_tags 函數來從應用程式中刪除 HTML 標記。但是,您遇到了一個缺點:strip_tags 無法刪除 HTML 特殊程式碼字符,例如「 」、「&」和「©」。 要解決此問題,請考慮使用替代函數,例如作為 html_enti...
    程式設計 發佈於2024-11-08
  • 如何在Python中透過拆分和刪除字元來解析和清理清單元素?
    如何在Python中透過拆分和刪除字元來解析和清理清單元素?
    如何拆分列表元素並刪除不需要的字符要拆分列表元素並刪除不需要的字符,您可以使用split() 和列表理解技術的組合。以下是如何獲得所需結果:在Python中,split()方法根據指定的分隔符號將字串劃分為列表。預設情況下,它會按空格字元進行分割,但您也可以傳遞可選的分隔符號。 要刪除 \t 字元及...
    程式設計 發佈於2024-11-08
  • Wagtail 以程式設計方式建立頁面翻譯
    Wagtail 以程式設計方式建立頁面翻譯
    我找不到任何程式介面來建立頁面翻譯。所有邏輯似乎都在 wagtail.contrib.simple_translation.views. 的 SubmitTranslationView 中實現 因此,以程式設計方式存取這些內容的唯一方法是模擬存取視圖的請求。我將其包裝在一個名為translate_p...
    程式設計 發佈於2024-11-08
  • Java 切換初學者指南
    Java 切換初學者指南
    Java Switching 在 Java 中引入了 Switch 語句的概念,為複雜的 if-else 鏈提供了替代方案。 Switch 語句可讓您將一個變數與多個可能的值進行比較,從而使您的程式碼更有效率且可讀。 本指南涵蓋了語法、不同資料類型的用法以及在 Java 中實現 switch 語...
    程式設計 發佈於2024-11-08
  • 如何修正 getMonth() 函數以在 JavaScript 中傳回正確的月份?
    如何修正 getMonth() 函數以在 JavaScript 中傳回正確的月份?
    JavaScript 中的getMonth() 函數傳回上個月在JavaScript 中,getMonth() 方法傳回指定日期的月份,從從0(一月)開始。但是,當與格式為“Sun Jul 7 00:00:00 EDT 2013”​​的日期一起使用時,它可以提供上個月而不是預期的月份。 這是因為 g...
    程式設計 發佈於2024-11-08
  • 用 javascript 創建一個打字遊戲來測量 wpm 速度
    用 javascript 創建一個打字遊戲來測量 wpm 速度
    用 JavaScript 創造一個快速打字遊戲 在本教程中,我們將創建一個簡單的打字遊戲,以 wpm 為單位測量我們的打字速度 - 每分鐘單字數、每分鐘字元數、拼寫錯誤,並允許改進它。我們將只使用 javascript 和 jQuery(這並不是真正需要的,但它會讓我們的打字遊戲變...
    程式設計 發佈於2024-11-08
  • 如何在 Pandas 中高效處理大型資料幀:將其分塊!
    如何在 Pandas 中高效處理大型資料幀:將其分塊!
    Pandas - 將大型資料幀切成塊當嘗試處理超大資料幀時,常見的障礙是可怕的記憶體錯誤。一種有效的解決方案是將資料幀劃分為更小的、可管理的區塊。這種策略不僅減少了記憶體消耗,而且有利於高效處理。 要實現這一點,我們可以利用列表理解或 NumPy array_split 函數。 列表理解n = 20...
    程式設計 發佈於2024-11-08
  • 提高 PHP 效率:經過驗證的效能最佳化技術
    提高 PHP 效率:經過驗證的效能最佳化技術
    优化 PHP 性能可确保我们的 Web 应用程序平稳运行、快速响应并有效处理流量。下面是关于如何有效地最大化 PHP 性能的详细分步指南,并为每个优化策略提供了实践示例。 第 1 部分:更新到最新的稳定 PHP 版本 第 1 步:检查当前 PHP 版本 首先检查系统上...
    程式設計 發佈於2024-11-08
  • 如何在 JavaScript 中實現簡潔可靠的 Cookie 讀取?
    如何在 JavaScript 中實現簡潔可靠的 Cookie 讀取?
    在JavaScript 中實作Cookie 讀取簡潔性在JavaScript 程式設計中經常需要存取cookie,通常需要為此包含一個函數目的。雖然 QuirksMode.org 中的 readCookie() 方法一直是一種流行的後備方法,但它既冗長又麻煩。 jQuery.cookie 採用的方法...
    程式設計 發佈於2024-11-08
  • 延遲載入腳本,例如映像
    延遲載入腳本,例如映像
    近年來html 的最佳改進之一是loading="lazy" 屬性,您可以將其添加到圖像(也包括iframe)中,該屬性將告訴瀏覽器不要載入圖像直到出現在視口中。 <img src="/images/your-image.png" loading="lazy"&g...
    程式設計 發佈於2024-11-08

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3