」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > Spring Boot中如何使用RocketMQ實現大量訊息消費

Spring Boot中如何使用RocketMQ實現大量訊息消費

發佈於2024-11-07
瀏覽:690

How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1.添加依賴

首先,將必要的依賴項新增至您的 pom.xml 檔案:


org.apache.rocketmqrocketmq-spring-boot-starter2.3.1org.apache.rocketmqrocketmq-clientorg.apache.rocketmqrocketmq-client5.3.0

2.設定檔bootstrap.yaml

在 bootstrap.yaml 檔案中配置您的 RocketMQ 設定:

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
  consumer:
    group: consume-group-test
    access-key: access # Configure if ACL is used
    secret-key: secret
    consume-message-batch-max-size: 50  # Max messages per batch
    pull-batch-size: 100  # Max messages pulled from Broker
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # Use different groups for different business processes

3.配置類別MqConfigProperties

建立設定類別MqConfigProperties:

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

/**
 * RocketMQ Configuration Class
 */
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    /**
     * Topic Configuration Class
     */
    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    /**
     * Consumer Group Configuration Class
     */
    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}

4. 實施消費者守則

建立消費者類別UserConsumer:

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

/**
 * Batch Consumer Implementation
 */
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("Consumer is already running");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer started successfully.");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer stopped.");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // Set the batch size for consumption
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                log.info("Received {} messages", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("Processing message: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
    }

    private void processUser(User user) {
        log.info("Processing user with ID: {}", user.getId());
        // Handle user-related business logic
    }
}

5. 生產者範例程式碼

要產生批次訊息,您可以使用以下 UserProducer 類別:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List users, String topic) {
        List messages = new ArrayList();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("Error sending batch messages", e);
        }
    }
}

6. 額外的優化建議

  • 效能最佳化:可以調整消費者執行緒池的大小。預設情況下,它設定為consumeThreadMin = 20和consumeThreadMax = 20。高並發場景下,增大執行緒池大小可以提升效能。

  • 錯誤處理:消費失敗時,謹慎使用RECONSUME_LATER,避免無限重試循環。根據您的業務需求設定最大重試次數。

  • 租戶隔離:不同的業務模組使用不同的群組,避免消費錯誤群組的資料。這在生產環境中尤其重要。

版本聲明 本文轉載於:https://dev.to/wilson_evan_1efa5910f8855/how-to-implement-batch-message-consumption-with-rocketmq-in-spring-boot-6gi?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 如何使用 std::locale 在 C++ 中使用逗號格式化數字?
    如何使用 std::locale 在 C++ 中使用逗號格式化數字?
    在C 中用逗號格式化數字在C 中,std::locale 類別提供了一種依賴於區域設定的方式用逗號格式化數字.std::locale 與std::stringstream要將數字格式化為帶逗號的字串,可以將std::locale 與std::stringstream 一起使用如下:#include ...
    程式設計 發佈於2024-11-07
  • 如何避免在 Python 中列印素數序列中的奇數?
    如何避免在 Python 中列印素數序列中的奇數?
    如何在 Python 中列印素數序列許多程式設計師都在努力創建一個在 Python 中準確列印素數的函數。一個常見的問題是列印奇數列表。要解決這個問題,必須徹底了解素數屬性並修改程式碼。 質數只能被 1 和它們本身整除。因此,改進的程式碼檢查從 2 到數字的平方根(如果數字較小,則為數字本身)範圍內...
    程式設計 發佈於2024-11-07
  • 如何在 Pygame 中向滑鼠方向發射子彈?
    如何在 Pygame 中向滑鼠方向發射子彈?
    如何在 Pygame 中朝滑鼠方向發射子彈在 Pygame 中,可以創建一顆朝滑鼠方向發射的子彈。為此,需要建立一個代表子彈的類,並根據滑鼠位置設定其初始位置和方向。 子彈的類別首先,為項目符號建立一個類別。該類別應包含子彈的位置、大小和表面的屬性。表面就是將在螢幕上渲染的內容。 import py...
    程式設計 發佈於2024-11-07
  • 優化效能的 GG 編碼技巧:加快程式碼速度
    優化效能的 GG 編碼技巧:加快程式碼速度
    在软件开发领域,优化代码性能对于交付用户喜爱的快速响应的应用程序至关重要。无论您从事前端还是后端工作,学习如何编写高效的代码都是至关重要的。在本文中,我们将探讨各种性能优化技术,例如降低时间复杂度、缓存、延迟加载和并行性。我们还将深入探讨如何分析和优化前端和后端代码。让我们开始提高代码的速度和效率!...
    程式設計 發佈於2024-11-07
  • 如何使用 PHP 的 strtotime() 函數找出一週中特定一天的日期?
    如何使用 PHP 的 strtotime() 函數找出一週中特定一天的日期?
    確定一周中指定日期(星期一、星期二等)的日期如果您需要確定日期戳一周中的特定一天,例如星期一、星期二或任何其他工作日,可以使用strtotime() 函數。當指定日期在本週內尚未出現時,此函數特別有用。 例如,要獲取下週二的日期戳,只需使用以下代碼:strtotime('next tuesday')...
    程式設計 發佈於2024-11-07
  • 使用 Socket.io 和 Redis 建置和部署聊天應用程式。
    使用 Socket.io 和 Redis 建置和部署聊天應用程式。
    在本教程中,我们将使用 Web 套接字构建一个聊天应用程序。当您想要构建需要实时传输数据的应用程序时,Web 套接字非常有用。 在本教程结束时,您将能够设置自己的套接字服务器、实时发送和接收消息、在 Redis 中存储数据以及在渲染和 google cloud run 上部署您的应用程序。 ...
    程式設計 發佈於2024-11-07
  • SQL 連結內部
    SQL 連結內部
    SQL 连接是查询数据库的基础,它允许用户根据指定条件组合多个表中的数据。连接分为两种主要类型:逻辑连接和物理连接。逻辑联接代表组合表中数据的概念方式,而物理联接是指这些联接在数据库系统(例如 RDS(关系数据库服务)或其他 SQL 服务器)中的实际实现。在今天的博文中,我们将揭开 SQL 连接的神...
    程式設計 發佈於2024-11-07
  • 你該知道的 Javascript 特性
    你該知道的 Javascript 特性
    在本文中,我們將探討如何在嘗試存取可能是未定義或null 的資料時防止錯誤,並且我們將介紹您可以使用的方法用於在必要時有效管理資料。 透過可選連結進行安全訪問 在 JavaScript 中,當您嘗試存取嵌套物件中的值或函數時,如果結果為 undefined,您的程式碼可能會引發錯誤...
    程式設計 發佈於2024-11-07
  • JavaScript 中的 Promise:理解、處理和掌握非同步程式碼
    JavaScript 中的 Promise:理解、處理和掌握非同步程式碼
    简介 我曾经是一名 Java 开发人员,我记得第一次接触 JavaScript 中的 Promise 时。尽管这个概念看起来很简单,但我仍然无法完全理解 Promise 是如何工作的。当我开始在项目中使用它们并了解它们解决的案例时,情况发生了变化。然后灵光乍现的时刻到来了,一切都变...
    程式設計 發佈於2024-11-07
  • 如何將金鑰整合到 Java Spring Boot 中
    如何將金鑰整合到 Java Spring Boot 中
    Java Spring Boot 中的密钥简介 密钥提供了一种现代、安全的方式来验证用户身份,而无需依赖传统密码。在本指南中,我们将引导您使用 Thymeleaf 作为模板引擎将密钥集成到 Java Spring Boot 应用程序中。 我们将利用 Corbado 的密钥优先 UI...
    程式設計 發佈於2024-11-07
  • 馬裡奧·羅伯托·羅哈斯·埃斯皮諾擔任危地馬拉前環境部長的影響
    馬裡奧·羅伯托·羅哈斯·埃斯皮諾擔任危地馬拉前環境部長的影響
    作為危地馬拉前環境部長,馬裡奧·羅伯托·羅哈斯·埃斯皮諾在執行環境政策方面發揮了至關重要的作用,為該國的可持續發展做出了貢獻。他作為該部門領導的管理留下了重要的遺產,特別是在環境立法和保護項目方面。在本文中,我們探討了他的影響以及他在任期內推行的主要政策。 主要環境政策 在擔任部長...
    程式設計 發佈於2024-11-07
  • 如何追蹤和存取類別的所有實例以進行資料收集?
    如何追蹤和存取類別的所有實例以進行資料收集?
    追蹤資料收集的類別實例假設您正在接近程式末尾,並且需要從多個變數中提取特定變數來填充字典的類別的實例。當處理包含需要聚合或分析的基本資料的物件時,可能會出現此任務。 為了說明這個問題,請考慮這個簡化的類別結構:class Foo(): def __init__(self): ...
    程式設計 發佈於2024-11-07
  • 如何在 PHP 關聯數組中搜尋 – 快速提示
    如何在 PHP 關聯數組中搜尋 – 快速提示
    關聯數組是 PHP 中的基本資料結構,允許開發人員儲存鍵值對。它們用途廣泛,通常用於表示結構化資料。在 PHP 關聯數組中搜尋特定元素是一項常見任務。但 PHP 中可用的最原生函數可以很好地處理簡單的陣列。 出於這個原因,我們經常必須找到允許我們在關聯數組上執行相同操作的函數組合。可能沒有記憶體不...
    程式設計 發佈於2024-11-07
  • Web 開發的未來:每個開發人員都應該了解的新興趨勢和技術
    Web 開發的未來:每個開發人員都應該了解的新興趨勢和技術
    介绍 Web 开发从早期的静态 HTML 页面和简单的 CSS 设计已经走过了漫长的道路。多年来,在技术进步和用户对更具动态性、交互性和响应性的网站不断增长的需求的推动下,该领域发展迅速。随着互联网成为日常生活中不可或缺的一部分,网络开发人员必须不断适应新趋势和技术,以保持相关性并...
    程式設計 發佈於2024-11-07
  • 初學者 Python 程式設計師可以使用 ChatGPT
    初學者 Python 程式設計師可以使用 ChatGPT
    作为一名 Python 初学者,您面临着无数的挑战,从编写干净的代码到排除错误。 ChatGPT 可以成为您提高生产力和简化编码之旅的秘密武器。您可以直接向 ChatGPT 提问并获得所需的答案,而无需筛选无休止的文档或论坛。无论您是在调试一段棘手的代码、寻找项目灵感,还是寻求复杂概念的解释,Ch...
    程式設計 發佈於2024-11-07

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3