」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 使用 GitLab CI/CD 和 Terraform 實作 Lambda 以進行 SFTP 整合、Go 中的 S Databricks

使用 GitLab CI/CD 和 Terraform 實作 Lambda 以進行 SFTP 整合、Go 中的 S Databricks

發佈於2024-11-08
瀏覽:347

Implementando uma Lambda com GitLab CI/CD e Terraform para Integração SFTP, S Databricks em Go

通过 Databricks 中的流程自动化降低成本

我的客户需要降低在 Databricks 上运行的流程的成本。 Databricks 负责的功能之一是从各种 SFTP 收集文件,解压缩它们并将它们放入数据湖中。

自动化数据工作流程是现代数据工程的重要组成部分。在本文中,我们将探讨如何使用 GitLab CI/CD 和 Terraform 创建 AWS Lambda 函数,该函数允许 Go 应用程序连接到 SFTP 服务器、收集文件、将其存储在 Amazon S3 中,并最终在 Databricks 上触发作业。这种端到端的流程对于依赖高效数据集成和自动化的系统至关重要。

阅读本文需要什么

  • 具有项目存储库的 GitLab 帐户。
  • 有权创建 Lambda、S3 和 IAM 资源的 AWS 账户。
  • 具有创建和运行作业权限的 Databricks 帐户。
  • Go、Terraform 和 GitLab CI/CD 的基础知识。

第 1 步:准备 Go 应用程序

首先创建一个 Go 应用程序,该应用程序将连接到 SFTP 服务器以收集文件。使用 github.com/pkg/sftp 等软件包建立 SFTP 连接,并使用 github.com/aws/aws-sdk-go 与 AWS S3 服务交互。

package main

import (
 "fmt"
 "log"
 "os"
 "path/filepath"

 "github.com/pkg/sftp"
 "golang.org/x/crypto/ssh"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func main() {
 // Configuração do cliente SFTP
 user := "seu_usuario_sftp"
 pass := "sua_senha_sftp"
 host := "endereco_sftp:22"
 config := &ssh.ClientConfig{
  User: user,
  Auth: []ssh.AuthMethod{
   ssh.Password(pass),
  },
  HostKeyCallback: ssh.InsecureIgnoreHostKey(),
 }

 // Conectar ao servidor SFTP
 conn, err := ssh.Dial("tcp", host, config)
 if err != nil {
  log.Fatal(err)
 }
 client, err := sftp.NewClient(conn)
 if err != nil {
  log.Fatal(err)
 }
 defer client.Close()

 // Baixar arquivos do SFTP
 remoteFilePath := "/path/to/remote/file"
 localDir := "/path/to/local/dir"
 localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath))
 dstFile, err := os.Create(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer dstFile.Close()

 srcFile, err := client.Open(remoteFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer srcFile.Close()

 if _, err := srcFile.WriteTo(dstFile); err != nil {
  log.Fatal(err)
 }

 fmt.Println("Arquivo baixado com sucesso:", localFilePath)

 // Configuração do cliente S3
 sess := session.Must(session.NewSession(&aws.Config{
  Region: aws.String("us-west-2"),
 }))
 uploader := s3manager.NewUploader(sess)

 // Carregar arquivo para o S3
 file, err := os.Open(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer file.Close()

 _, err = uploader.Upload(&s3manager.UploadInput{
  Bucket: aws.String("seu-bucket-s3"),
  Key:    aws.String(filepath.Base(localFilePath)),
  Body:   file,
 })
 if err != nil {
  log.Fatal("Falha ao carregar arquivo para o S3:", err)
 }

 fmt.Println("Arquivo carregado com sucesso no S3")
}

步骤 2:配置 Terraform

Terraform 将用于在 AWS 上配置 Lambda 函数和所需资源。使用创建 Lambda 函数、IAM 策略和 S3 存储桶所需的配置创建 main.tf 文件。

provider "aws" {
  region = "us-east-1"
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        },
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "A policy that allows a lambda function to access S3 and SFTP resources"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "s3:ListBucket",
          "s3:GetObject",
          "s3:PutObject",
        ],
        Effect = "Allow",
        Resource = [
          "arn:aws:s3:::seu-bucket-s3",
          "arn:aws:s3:::seu-bucket-s3/*",
        ],
      },
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sftp_lambda" {
  function_name = "sftp_lambda_function"

  s3_bucket = "seu-bucket-s3-com-codigo-lambda"
  s3_key    = "sftp-lambda.zip"

  handler = "main"
  runtime = "go1.x"

  role = aws_iam_role.lambda_execution_role.arn

  environment {
    variables = {
      SFTP_HOST     = "endereco_sftp",
      SFTP_USER     = "seu_usuario_sftp",
      SFTP_PASSWORD = "sua_senha_sftp",
      S3_BUCKET     = "seu-bucket-s3",
    }
  }
}

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "seu-bucket-s3"
  acl    = "private"
}

步骤 3:配置 GitLab CI/CD

在 GitLab 中,在 .gitlab-ci.yml 文件中定义 CI/CD 管道。该管道应包括测试 Go 应用程序、运行 Terraform 来配置基础设施的步骤,以及必要时的清理步骤。

stages:
  - test
  - build
  - deploy

variables:
  S3_BUCKET: "seu-bucket-s3"
  AWS_DEFAULT_REGION: "us-east-1"
  TF_VERSION: "1.0.0"

before_script:
  - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
  - eval $(ssh-agent -s)
  - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add -
  - mkdir -p ~/.ssh
  - chmod 700 ~/.ssh
  - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts

test:
  stage: test
  image: golang:1.18
  script:
    - go test -v ./...

build:
  stage: build
  image: golang:1.18
  script:
    - go build -o myapp
    - zip -r sftp-lambda.zip myapp
  artifacts:
    paths:
      - sftp-lambda.zip
  only:
    - master

deploy:
  stage: deploy
  image: hashicorp/terraform:$TF_VERSION
  script:
    - terraform init
    - terraform apply -auto-approve
  only:
    - master
  environment:
    name: production

第 4 步:与 Databricks 集成

将文件上传到 S3 后,Lambda 函数必须触发 Databricks 中的作业。这可以使用 Databricks API 启动现有作业来完成。

package main

import (
 "bytes"
 "encoding/json"
 "fmt"
 "net/http"
)

// Estrutura para a requisição de iniciar um job no Databricks
type DatabricksJobRequest struct {
 JobID int `json:"job_id"`
}

// Função para acionar um job no Databricks
func triggerDatabricksJob(databricksInstance string, token string, jobID int) error {
 url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance)
 requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID})
 req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
 if err != nil {
  return err
 }

 req.Header.Set("Content-Type", "application/json")
 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

 client := &http.Client{}
 resp, err := client.Do(req)
 if err != nil {
  return err
 }
 defer resp.Body.Close()

 if resp.StatusCode != http.StatusOK {
  return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode)
 }

 return nil
}

func main() {
 // ... (código existente para conectar ao SFTP e carregar no S3)

 // Substitua pelos seus valores reais
 databricksInstance := "your-databricks-instance"
 databricksToken := "your-databricks-token"
 databricksJobID := 123 // ID do job que você deseja acionar

 // Acionar o job no Databricks após o upload para o S3
 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID)
 if err != nil {
  log.Fatal("Erro ao acionar o job do Databricks:", err)
 }

 fmt.Println("Job do Databricks acionado com sucesso")
}

第 5 步:运行管道

将代码推送到 GitLab 存储库以便管道运行。验证所有步骤是否已成功完成,Lambda 函数是否可运行并与 S3 和 Databricks 正确交互。

一旦您拥有完整的代码并配置了 .gitlab-ci.yml 文件,您就可以按照以下步骤运行管道:

  • 将您的代码推送到 GitLab 存储库:
  git add .
  git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
  git push origin master
git add .
git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
git push origin master
´´´

  • GitLab CI/CD 将检测新的提交并自动启动管道。
  • 通过访问存储库的 CI/CD 部分来跟踪 GitLab 中管道的执行情况。
  • 如果所有阶段都成功,您的 Lambda 函数将被部署并可供使用。

请记住,您需要在 GitLab CI/CD 中配置环境变量来存储敏感信息,例如访问令牌和私钥。这可以在 GitLab 项目的“设置”>“CI/CD”>“变量”部分中完成。

此外,请确保 Databricks 令牌具有触发作业所需的权限,并且该作业具有提供的 ID。

结论

使用 GitLab CI/CD、Terraform 和 AWS Lambda 等工具可以显着简化数据工程任务的自动化。通过遵循本文中概述的步骤,您可以创建一个强大的系统,自动执行 SFTP、S3 和 Databricks 之间的数据收集和集成,所有这些都具有 Go 的效率和简单性。通过这种方法,您将有能力解决以下问题。大规模数据集成的挑战。

我的联系人:

LinkedIn - Airton Lira Junior

iMasters - Airton Lira Junior

aws #lambda #terraform #gitlab #ci_cd #go #databricks #dataengineering #automation


版本聲明 本文轉載於:https://dev.to/airton_lirajunior_2ddebd/implementando-uma-lambda-com-gitlab-cicd-e-terraform-para-integracao-sftp-s3-e-databricks-em-go-5hc0?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 了解 JavaScript 產生器:強大的程式碼流控制工具
    了解 JavaScript 產生器:強大的程式碼流控制工具
    生成器是 JavaScript 中最强大的功能之一,它允许我们编写可以根据需要暂停和恢复的代码。与一次执行所有代码的常规函数​​不同,生成器使用延迟执行,增量返回值,从而更容易处理数据序列、迭代或长时间运行的进程。 发电机如何工作? 在JavaScript中,生成器是使用functi...
    程式設計 發佈於2024-11-08
  • 如何在 groupby 作業期間維護 Pandas DataFrame 中的其他欄位?
    如何在 groupby 作業期間維護 Pandas DataFrame 中的其他欄位?
    在Groupby 操作期間維護其他列對pandas 資料框執行groupby 操作時,通常需要保留不屬於的列參與分組或聚合過程。預設情況下,操作完成後將刪除這些其他欄位。如果保留的列包含有價值的信息,這可能會出現問題。 考慮以下資料框: item diff otherstuff ...
    程式設計 發佈於2024-11-08
  • 根據您文章的內容,以下是一些適合問答格式的標題選項:

選項 1(直接):

* 如何從 OpentelemetryContext Pr 中的字串追蹤 ID 建構跨度
    根據您文章的內容,以下是一些適合問答格式的標題選項: 選項 1(直接): * 如何從 OpentelemetryContext Pr 中的字串追蹤 ID 建構跨度
    在 Opentelemetry 中根據追蹤 ID 建構 Span上下文傳播通常用於檢索父追蹤 ID 並建立子 Span。但是,在使用標頭進行訊息交換的情況下,需要替代方法。 要從字串追蹤ID 建立範圍,可以依照下列步驟操作:建構一個解析Trace 和Span ID 的函式:func construc...
    程式設計 發佈於2024-11-08
  • 如何將 JSON 字串轉換為 Python 字典?
    如何將 JSON 字串轉換為 Python 字典?
    如何將JSON 字串轉換為Python 字典JSON(JavaScript 物件表示法)是一種流行的資料格式,通常以於表示複雜的資料結構。在 Python 中,您可以使用 json 模組來處理 JSON 資料。 一個常見的任務是將 JSON 字串轉換為 Python 字典。這允許您以鍵值對的形式存取...
    程式設計 發佈於2024-11-08
  • 治療高血壓的整體阿育吠陀方法
    治療高血壓的整體阿育吠陀方法
    管理高血壓的整體阿育吠陀方法 阿育吠陀治療高血壓提供了一種自然和整體的方法來管理這一常見的健康問題。在阿育吠陀中,高血壓通常與能量的不平衡有關,特別是皮塔和瓦塔。這種古老的醫學體系旨在透過個人化治療和生活方式改變來恢復平衡並促進整體健康。 治療高血壓的阿育吠陀療法包括使用特定草藥,如阿朱那、南非醉...
    程式設計 發佈於2024-11-08
  • 如何設定 Y 軸範圍以豐富多個子圖佈局中的視覺化?
    如何設定 Y 軸範圍以豐富多個子圖佈局中的視覺化?
    設定子圖軸範圍背景在視覺化中處理多個子圖時,有必要控制每個子圖的軸範圍以確保正確的數據表示。本問題探討如何在雙子圖佈局中設定第二個子圖的 y 軸範圍。當 FFT 圖出現異常尖峰,導致所需資料不可見時,就會出現此問題。 解決方案要解決此問題,請在繪圖後使用 pylab.ylim([bottom, to...
    程式設計 發佈於2024-11-08
  • CSS 中的盒子模型:製作精確版面的終極指南
    CSS 中的盒子模型:製作精確版面的終極指南
    Web設計概念中,盒子模型(Box Model)是CSS中非常關注佈局設計的基礎知識領域。盒子模型解釋了頁面上元素的放置和定位方式,這會影響水平度和整體連貫性。 除了概述盒子模型的背景和邊框之外,本指南還指導您完成填充、邊距和盒子大小調整,以便您可以完全控制佈局。要開始專業級的 Web 專案設計,...
    程式設計 發佈於2024-11-08
  • 什麼是語法和語義
    什麼是語法和語義
    如果您正在学习一门语言,您可能听说过“语法”这个词并且一直在处理它。 (该死的语法错误)。 几天前的晚上,我在想,我从来没有认真遵循过编程范式和技术,今天我开始从我经常听到的最小主题中学习(即使我已经记住了)。我创建了这个存储库。我采取的学习路径很有特色(在 LEARNING_LIST.md 文件中...
    程式設計 發佈於2024-11-08
  • 如何在不切換焦點的情況下在背景開啟新分頁?
    如何在不切換焦點的情況下在背景開啟新分頁?
    在後台打開新選項卡而不進行焦點切換在這個問題中,用戶尋求在單獨的選項卡中打開新選項卡不會導致任何焦點轉移到新選項卡。他們示範了使用 open() 和 focus() 方法的嘗試,但在傳回 Chrome 中的目前分頁之前遇到了新分頁的短暫閃爍。 但是,提供的答案提供了使用事件的替代解決方案自訂事件調度...
    程式設計 發佈於2024-11-08
  • 如何使用自訂語句來擴展 Python 語法?
    如何使用自訂語句來擴展 Python 語法?
    為 Python 語法新增語句Python 的語法允許語句定義,例如 print、raise 和 with。雖然這些語句提供了廣泛的功能,但可以擴展此語法以適應自訂語句。 建立自訂語句涉及兩個主要步驟建立自訂語句:修改語法:您需要更新Python 的語法以包含新語句的定義。這涉及到修改 Gramma...
    程式設計 發佈於2024-11-08
  • 使用 PHP 中的服務層模式實現簡潔且可擴展的程式碼
    使用 PHP 中的服務層模式實現簡潔且可擴展的程式碼
    服務層模式是一種流行的設計方法,用於處理 PHP 應用程式中的業務邏輯。透過將應用程式邏輯與框架分離,我們創建了一個可擴展、可測試且可維護的程式碼庫。在本文中,我們將透過實際範例介紹服務層模式的基礎知識、其優點以及如何在 PHP 應用程式中實現它。 什麼是服務層模式? 服務層模式是...
    程式設計 發佈於2024-11-08
  • 您能否在 C++ 中實現高效的二分搜尋演算法,將迭代器返回搜尋結果?
    您能否在 C++ 中實現高效的二分搜尋演算法,將迭代器返回搜尋結果?
    在C 中搜尋高效的二分搜尋演算法程式設計師經常尋求實現高效的二分搜尋演算法,以提供對數時間複雜度的優勢。一個常見的要求是演算法傳回一個指向搜尋結果的迭代器,而不是一個指示其存在的簡單布林值。 C 標準庫在 標頭中提供了 std::binary_search,但它只傳回一個布林值。正如提問者所指出的...
    程式設計 發佈於2024-11-08
  • 使用 Javascript 的心形圖案程式碼
    使用 Javascript 的心形圖案程式碼
    程式碼 let heart = ""; let size = 6; // Loop to print upper part of the heart for (let i = size / 2; i <= size; i = 2) { for (let j = 1;...
    程式設計 發佈於2024-11-08
  • 如何將 PHP 資料庫結果數組轉換為 JSON?
    如何將 PHP 資料庫結果數組轉換為 JSON?
    將 PHP 資料庫結果陣列轉換為 JSON您想要在 PHP 中將資料庫查詢的結果陣列轉換為 JSON 格式。以下程式碼從資料庫中檢索一行:$row = mysql_fetch_array($result);要將$row 陣列轉換為JSON 格式以便與jQuery 外掛一起使用,您需要可以利用json...
    程式設計 發佈於2024-11-08
  • 如何在 Chrome DevTools 中輕鬆識別和監控表單元素事件?
    如何在 Chrome DevTools 中輕鬆識別和監控表單元素事件?
    了解元素互動觸發的事件要在可自訂表單元素上正確識別和處理事件,必須了解互動時觸發的特定事件。 Chrome DevTools 提供了一個強大的工具,monitorEvents,來協助完成此過程。 使用monitorEvents()檢查目標元素: 右鍵單擊該元素並選擇“Inspect”或在DevToo...
    程式設計 發佈於2024-11-08

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3