”工欲善其事,必先利其器。“—孔子《论语.录灵公》
首页 > 编程 > 使用 GitLab CI/CD 和 Terraform 实现 Lambda 以进行 SFTP 集成、Go 中的 S Databricks

使用 GitLab CI/CD 和 Terraform 实现 Lambda 以进行 SFTP 集成、Go 中的 S Databricks

发布于2024-11-09
浏览:880

Implementando uma Lambda com GitLab CI/CD e Terraform para Integração SFTP, S Databricks em Go

通过 Databricks 中的流程自动化降低成本

我的客户需要降低在 Databricks 上运行的流程的成本。 Databricks 负责的功能之一是从各种 SFTP 收集文件,解压缩它们并将它们放入数据湖中。

自动化数据工作流程是现代数据工程的重要组成部分。在本文中,我们将探讨如何使用 GitLab CI/CD 和 Terraform 创建 AWS Lambda 函数,该函数允许 Go 应用程序连接到 SFTP 服务器、收集文件、将其存储在 Amazon S3 中,并最终在 Databricks 上触发作业。这种端到端的流程对于依赖高效数据集成和自动化的系统至关重要。

阅读本文需要什么

  • 具有项目存储库的 GitLab 帐户。
  • 有权创建 Lambda、S3 和 IAM 资源的 AWS 账户。
  • 具有创建和运行作业权限的 Databricks 帐户。
  • Go、Terraform 和 GitLab CI/CD 的基础知识。

第 1 步:准备 Go 应用程序

首先创建一个 Go 应用程序,该应用程序将连接到 SFTP 服务器以收集文件。使用 github.com/pkg/sftp 等软件包建立 SFTP 连接,并使用 github.com/aws/aws-sdk-go 与 AWS S3 服务交互。

package main

import (
 "fmt"
 "log"
 "os"
 "path/filepath"

 "github.com/pkg/sftp"
 "golang.org/x/crypto/ssh"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func main() {
 // Configuração do cliente SFTP
 user := "seu_usuario_sftp"
 pass := "sua_senha_sftp"
 host := "endereco_sftp:22"
 config := &ssh.ClientConfig{
  User: user,
  Auth: []ssh.AuthMethod{
   ssh.Password(pass),
  },
  HostKeyCallback: ssh.InsecureIgnoreHostKey(),
 }

 // Conectar ao servidor SFTP
 conn, err := ssh.Dial("tcp", host, config)
 if err != nil {
  log.Fatal(err)
 }
 client, err := sftp.NewClient(conn)
 if err != nil {
  log.Fatal(err)
 }
 defer client.Close()

 // Baixar arquivos do SFTP
 remoteFilePath := "/path/to/remote/file"
 localDir := "/path/to/local/dir"
 localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath))
 dstFile, err := os.Create(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer dstFile.Close()

 srcFile, err := client.Open(remoteFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer srcFile.Close()

 if _, err := srcFile.WriteTo(dstFile); err != nil {
  log.Fatal(err)
 }

 fmt.Println("Arquivo baixado com sucesso:", localFilePath)

 // Configuração do cliente S3
 sess := session.Must(session.NewSession(&aws.Config{
  Region: aws.String("us-west-2"),
 }))
 uploader := s3manager.NewUploader(sess)

 // Carregar arquivo para o S3
 file, err := os.Open(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer file.Close()

 _, err = uploader.Upload(&s3manager.UploadInput{
  Bucket: aws.String("seu-bucket-s3"),
  Key:    aws.String(filepath.Base(localFilePath)),
  Body:   file,
 })
 if err != nil {
  log.Fatal("Falha ao carregar arquivo para o S3:", err)
 }

 fmt.Println("Arquivo carregado com sucesso no S3")
}

步骤 2:配置 Terraform

Terraform 将用于在 AWS 上配置 Lambda 函数和所需资源。使用创建 Lambda 函数、IAM 策略和 S3 存储桶所需的配置创建 main.tf 文件。

provider "aws" {
  region = "us-east-1"
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        },
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "A policy that allows a lambda function to access S3 and SFTP resources"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "s3:ListBucket",
          "s3:GetObject",
          "s3:PutObject",
        ],
        Effect = "Allow",
        Resource = [
          "arn:aws:s3:::seu-bucket-s3",
          "arn:aws:s3:::seu-bucket-s3/*",
        ],
      },
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sftp_lambda" {
  function_name = "sftp_lambda_function"

  s3_bucket = "seu-bucket-s3-com-codigo-lambda"
  s3_key    = "sftp-lambda.zip"

  handler = "main"
  runtime = "go1.x"

  role = aws_iam_role.lambda_execution_role.arn

  environment {
    variables = {
      SFTP_HOST     = "endereco_sftp",
      SFTP_USER     = "seu_usuario_sftp",
      SFTP_PASSWORD = "sua_senha_sftp",
      S3_BUCKET     = "seu-bucket-s3",
    }
  }
}

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "seu-bucket-s3"
  acl    = "private"
}

步骤 3:配置 GitLab CI/CD

在 GitLab 中,在 .gitlab-ci.yml 文件中定义 CI/CD 管道。该管道应包括测试 Go 应用程序、运行 Terraform 来配置基础设施的步骤,以及必要时的清理步骤。

stages:
  - test
  - build
  - deploy

variables:
  S3_BUCKET: "seu-bucket-s3"
  AWS_DEFAULT_REGION: "us-east-1"
  TF_VERSION: "1.0.0"

before_script:
  - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
  - eval $(ssh-agent -s)
  - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add -
  - mkdir -p ~/.ssh
  - chmod 700 ~/.ssh
  - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts

test:
  stage: test
  image: golang:1.18
  script:
    - go test -v ./...

build:
  stage: build
  image: golang:1.18
  script:
    - go build -o myapp
    - zip -r sftp-lambda.zip myapp
  artifacts:
    paths:
      - sftp-lambda.zip
  only:
    - master

deploy:
  stage: deploy
  image: hashicorp/terraform:$TF_VERSION
  script:
    - terraform init
    - terraform apply -auto-approve
  only:
    - master
  environment:
    name: production

第 4 步:与 Databricks 集成

将文件上传到 S3 后,Lambda 函数必须触发 Databricks 中的作业。这可以使用 Databricks API 启动现有作业来完成。

package main

import (
 "bytes"
 "encoding/json"
 "fmt"
 "net/http"
)

// Estrutura para a requisição de iniciar um job no Databricks
type DatabricksJobRequest struct {
 JobID int `json:"job_id"`
}

// Função para acionar um job no Databricks
func triggerDatabricksJob(databricksInstance string, token string, jobID int) error {
 url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance)
 requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID})
 req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
 if err != nil {
  return err
 }

 req.Header.Set("Content-Type", "application/json")
 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

 client := &http.Client{}
 resp, err := client.Do(req)
 if err != nil {
  return err
 }
 defer resp.Body.Close()

 if resp.StatusCode != http.StatusOK {
  return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode)
 }

 return nil
}

func main() {
 // ... (código existente para conectar ao SFTP e carregar no S3)

 // Substitua pelos seus valores reais
 databricksInstance := "your-databricks-instance"
 databricksToken := "your-databricks-token"
 databricksJobID := 123 // ID do job que você deseja acionar

 // Acionar o job no Databricks após o upload para o S3
 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID)
 if err != nil {
  log.Fatal("Erro ao acionar o job do Databricks:", err)
 }

 fmt.Println("Job do Databricks acionado com sucesso")
}

第 5 步:运行管道

将代码推送到 GitLab 存储库以便管道运行。验证所有步骤是否已成功完成,Lambda 函数是否可运行并与 S3 和 Databricks 正确交互。

一旦您拥有完整的代码并配置了 .gitlab-ci.yml 文件,您就可以按照以下步骤运行管道:

  • 将您的代码推送到 GitLab 存储库:
  git add .
  git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
  git push origin master
git add .
git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
git push origin master
´´´

  • GitLab CI/CD 将检测新的提交并自动启动管道。
  • 通过访问存储库的 CI/CD 部分来跟踪 GitLab 中管道的执行情况。
  • 如果所有阶段都成功,您的 Lambda 函数将被部署并可供使用。

请记住,您需要在 GitLab CI/CD 中配置环境变量来存储敏感信息,例如访问令牌和私钥。这可以在 GitLab 项目的“设置”>“CI/CD”>“变量”部分中完成。

此外,请确保 Databricks 令牌具有触发作业所需的权限,并且该作业具有提供的 ID。

结论

使用 GitLab CI/CD、Terraform 和 AWS Lambda 等工具可以显着简化数据工程任务的自动化。通过遵循本文中概述的步骤,您可以创建一个强大的系统,自动执行 SFTP、S3 和 Databricks 之间的数据收集和集成,所有这些都具有 Go 的效率和简单性。通过这种方法,您将有能力解决以下问题。大规模数据集成的挑战。

我的联系人:

LinkedIn - Airton Lira Junior

iMasters - Airton Lira Junior

aws #lambda #terraform #gitlab #ci_cd #go #databricks #dataengineering #automation


版本声明 本文转载于:https://dev.to/airton_lirajunior_2ddebd/implementando-uma-lambda-com-gitlab-cicd-e-terraform-para-integracao-sftp-s3-e-databricks-em-go-5hc0?1如有侵犯,请联系[email protected]删除
最新教程 更多>
  • 如何使用PHP从XML文件中有效地检索属性值?
    如何使用PHP从XML文件中有效地检索属性值?
    从php 您的目标可能是检索“ varnum”属性值,其中提取数据的传统方法可能会使您感到困惑。 - > attributes()为$ attributeName => $ attributeValue){ echo $ attributeName,'=“',$ at...
    编程 发布于2025-02-19
  • PHP阵列键值异常:了解07和08的好奇情况
    PHP阵列键值异常:了解07和08的好奇情况
    PHP数组键值问题,使用07&08 在给定数月的数组中,键值07和08呈现令人困惑的行为时,就会出现一个不寻常的问题。运行print_r($月份)返回意外结果:键“ 07”丢失,而键“ 08”分配给了9月的值。此问题源于PHP对领先零的解释。当一个数字带有0(例如07或08)的前缀时,PHP将...
    编程 发布于2025-02-19
  • 如何干净地删除匿名JavaScript事件处理程序?
    如何干净地删除匿名JavaScript事件处理程序?
    element.addeventlistener(event,function(){/要解决此问题,请考虑将事件处理程序存储在中心位置,例如页面的主要对象,请考虑将事件处理程序存储在中心位置,否则无法清理匿名事件处理程序。 。这允许在需要时轻松迭代和清洁处理程序。
    编程 发布于2025-02-19
  • 如何使用Python的记录模块实现自定义处理?
    如何使用Python的记录模块实现自定义处理?
    使用Python的Loggging Module 确保正确处理和登录对于疑虑和维护的稳定性至关重要Python应用程序。尽管手动捕获和记录异常是一种可行的方法,但它可能乏味且容易出错。解决此问题,Python允许您覆盖默认的异常处理机制,并将其重定向为登录模块。这提供了一种方便而系统的方法来捕获和...
    编程 发布于2025-02-19
  • 为什么使用Firefox后退按钮时JavaScript执行停止?
    为什么使用Firefox后退按钮时JavaScript执行停止?
    导航历史记录问题:JavaScript使用Firefox Back Back 此行为是由浏览器缓存JavaScript资源引起的。要解决此问题并确保在后续页面访问中执行脚本,Firefox用户应设置一个空功能以在window.onunload事件上调用。 pre> window.onload ...
    编程 发布于2025-02-19
  • 我可以将加密从McRypt迁移到OpenSSL,并使用OpenSSL迁移MCRYPT加密数据?
    我可以将加密从McRypt迁移到OpenSSL,并使用OpenSSL迁移MCRYPT加密数据?
    将我的加密库从mcrypt升级到openssl 问题:是否可以将我的加密库从McRypt升级到OpenSSL?如果是这样?使用openssl? openssl_decrypt()函数要求iv参数的长度与所使用的cipher的块大小相同。 && && && && &&华openssl_decry...
    编程 发布于2025-02-19
  • 对象拟合:IE和Edge中的封面失败,如何修复?
    对象拟合:IE和Edge中的封面失败,如何修复?
    解决此问题,我们采用了一个巧妙的CSS解决方案来解决问题:左:50%; 高度:auto; 宽度:100%; //对于水平块 ,使用绝对定位将图像定位在中心,以object-fit:object-fit:cover in IE和edge消除了问题。现在,图像将按比例扩展,保持所需的效果而不会失真。...
    编程 发布于2025-02-19
  • 如何在JavaScript对象中动态设置键?
    如何在JavaScript对象中动态设置键?
    如何为JavaScript对象变量创建动态键,尝试为JavaScript对象创建动态键,使用此Syntax jsObj['key' i] = 'example' 1;将不起作用。正确的方法采用方括号:他们维持一个长度属性,该属性反映了数字属性(索引)和一个数字属性的数量。标准对象没有模仿这...
    编程 发布于2025-02-19
  • 为什么箭头函数在IE11中引起语法错误?如何修复它们?
    为什么箭头函数在IE11中引起语法错误?如何修复它们?
    为什么arrow functions在IE 11 中引起语法错误。 IE 11不支持箭头函数,导致语法错误。这使用传统函数语法来定义与原始箭头函数相同的逻辑。 IE 11现在将正确识别并执行代码。
    编程 发布于2025-02-19
  • 如何使用PHP将斑点(图像)正确插入MySQL?
    如何使用PHP将斑点(图像)正确插入MySQL?
    在尝试将image存储在mysql数据库中时,您可能会遇到一个可能会遇到问题。本指南将提供成功存储您的图像数据的解决方案。 essue values('$ this-> image_id','file_get_contents($ tmp_image)&#...
    编程 发布于2025-02-19
  • 如何检查对象是否具有Python中的特定属性?
    如何检查对象是否具有Python中的特定属性?
    方法来确定对象属性存在寻求一种方法来验证对象中特定属性的存在。考虑以下示例,其中尝试访问不确定属性会引起错误: >>> a = someClass() >>> A.property Trackback(最近的最新电话): 文件“ ”,第1行, AttributeError:SomeClass实...
    编程 发布于2025-02-19
  • 为什么Microsoft Visual C ++无法正确实现两台模板的实例?
    为什么Microsoft Visual C ++无法正确实现两台模板的实例?
    [2明确担心Microsoft Visual C(MSVC)在正确实现两相模板实例化方面努力努力。该机制的哪些具体方面无法按预期运行?背景:说明:的初始Syntax检查在范围中受到限制。它未能检查是否存在声明名称的存在,导致名称缺乏正确的声明时会导致编译问题。为了说明这一点,请考虑以下示例:一个符合...
    编程 发布于2025-02-19
  • 如何以不同的频率控制Android设备振动?
    如何以不同的频率控制Android设备振动?
    控制使用频率变化的Android设备振动是否想为您的Android应用程序添加触觉元素?了解如何触发设备的振动器至关重要。您可以做到这一点:生成基本振动以生成简单的振动,使用振动器对象:这将导致设备在指定的持续时间内振动。许可要求通过上述技术,您可以创建在您的Android应用程序中自定义振动,以增...
    编程 发布于2025-02-19
  • Java是否允许多种返回类型:仔细研究通用方法?
    Java是否允许多种返回类型:仔细研究通用方法?
    在java中的多个返回类型:一个误解介绍,其中foo是自定义类。该方法声明似乎拥有两种返回类型:列表和E。但是,情况确实如此吗?通用方法:拆开神秘 [方法仅具有单一的返回类型。相反,它采用机制,如钻石符号“ ”。分解方法签名: :本节定义了一个通用类型参数,E。它表示该方法接受了扩展foo类的任...
    编程 发布于2025-02-19
  • 如何在Java字符串中有效替换多个子字符串?
    如何在Java字符串中有效替换多个子字符串?
    Exploiting Regular ExpressionsA more efficient solution involves leveraging regular expressions.正则表达式允许您定义复杂的搜索模式并在单个操作中执行文本转换。示例使用接下来,您可以使用匹配器查找令牌的所...
    编程 发布于2025-02-19

免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。

Copyright© 2022 湘ICP备2022001581号-3