”工欲善其事,必先利其器。“—孔子《论语.录灵公》
首页 > 编程 > Spring Boot中如何使用RocketMQ实现批量消息消费

Spring Boot中如何使用RocketMQ实现批量消息消费

发布于2024-11-07
浏览:236

How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1.添加依赖

首先,将必要的依赖项添加到您的 pom.xml 文件中:


org.apache.rocketmqrocketmq-spring-boot-starter2.3.1org.apache.rocketmqrocketmq-clientorg.apache.rocketmqrocketmq-client5.3.0

2.配置文件bootstrap.yaml

在 bootstrap.yaml 文件中配置您的 RocketMQ 设置:

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
  consumer:
    group: consume-group-test
    access-key: access # Configure if ACL is used
    secret-key: secret
    consume-message-batch-max-size: 50  # Max messages per batch
    pull-batch-size: 100  # Max messages pulled from Broker
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # Use different groups for different business processes

3.配置类MqConfigProperties

创建配置类MqConfigProperties:

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

/**
 * RocketMQ Configuration Class
 */
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    /**
     * Topic Configuration Class
     */
    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    /**
     * Consumer Group Configuration Class
     */
    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}

4. 实施消费者守则

创建消费者类UserConsumer:

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

/**
 * Batch Consumer Implementation
 */
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("Consumer is already running");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer started successfully.");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer stopped.");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // Set the batch size for consumption
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                log.info("Received {} messages", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("Processing message: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
    }

    private void processUser(User user) {
        log.info("Processing user with ID: {}", user.getId());
        // Handle user-related business logic
    }
}

5. 生产者示例代码

要生成批量消息,您可以使用以下 UserProducer 类:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List users, String topic) {
        List messages = new ArrayList();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("Error sending batch messages", e);
        }
    }
}

6. 额外的优化建议

  • 性能优化:可以调整消费者线程池的大小。默认情况下,它设置为consumeThreadMin = 20和consumeThreadMax = 20。高并发场景下,增大线程池大小可以提升性能。

  • 错误处理:消费失败时,谨慎使用RECONSUME_LATER,避免无限重试循环。根据您的业务需求设置最大重试次数。

  • 租户隔离:不同的业务模块使用不同的组,避免消费错误组的数据。这在生产环境中尤其重要。

版本声明 本文转载于:https://dev.to/wilson_evan_1efa5910f8855/how-to-implement-batch-message-consumption-with-rocketmq-in-spring-boot-6gi?1如有侵犯,请联系[email protected]删除
最新教程 更多>
  • Turborepo 与 Nx:哪种 Monorepo 工具适合您?
    Turborepo 与 Nx:哪种 Monorepo 工具适合您?
    随着现代开发变得越来越复杂,monorepos变得越来越流行。它们允许将多个项目或包存储在单个存储库中,从而简化依赖关系管理并促进更好的协作。用于管理 monorepos 的两个顶级工具是 Turborepo 和 Nx。 这两种工具都旨在提高处理单一存储库的效率和可扩展性,但它们具有独特的优势。在本...
    编程 发布于2024-11-07
  • Java 数组简介
    Java 数组简介
    编程通常涉及管理和操作大量数据,对此高效且有效的数据结构至关重要。数组是计算机科学中的基本数据结构,提供了一种存储固定大小的相同类型元素序列的方法。在本博客中,我们将深入了解 Java 中的数组:了解它们是什么、它们的语法、如何对它们进行操作以及它们的内存管理。 为什么我们需要数组?...
    编程 发布于2024-11-07
  • 解决 CORS 问题的方法
    解决 CORS 问题的方法
    要解决 CORS 问题,您需要在 Web 服务器(如 Apache 或 Nginx)、后端(如 Django、Go 或 Node.js)中添加适当的标头,或在前端框架(如 React 或 Next.js)中。以下是每个平台的步骤: 1. 网络服务器 阿帕奇 您可以在 ...
    编程 发布于2024-11-07
  • 内存对齐如何影响 C 结构的大小?
    内存对齐如何影响 C 结构的大小?
    C 结构中的内存对齐使用 C 结构时,理解内存对齐至关重要。内存对齐是指将数据在内存中放置在特定的边界处。在 32 位机器上,内存通常按 4 字节边界对齐。结构的内存对齐考虑以下结构:typedef struct { unsigned short v1; unsigned short...
    编程 发布于2024-11-07
  • 受顶级旅游景点启发构建创新项目:令人难忘的旅行体验开发人员指南
    受顶级旅游景点启发构建创新项目:令人难忘的旅行体验开发人员指南
    作为开发商,我们经常从周围的世界中汲取灵感——还有什么比令人难以置信的旅游景点更好的来源呢?无论您是在开发旅行应用程序、沉浸式体验还是基于位置的服务,了解目的地的脱颖而出都是关键。看看这份关于阿尔巴尼亚最佳旅游景点的终极指南,为您的下一个创意项目提供动力,并了解这些地标如何在现实世界和数字世界中塑造...
    编程 发布于2024-11-07
  • 如何使用 std::locale 在 C++ 中使用逗号格式化数字?
    如何使用 std::locale 在 C++ 中使用逗号格式化数字?
    在 C 中用逗号格式化数字 在 C 中,std::locale 类提供了一种依赖于区域设置的方式用逗号格式化数字.std::locale 与 std::stringstream要将数字格式化为带逗号的字符串,可以将 std::locale 与 std::stringstream 一起使用如下:#in...
    编程 发布于2024-11-07
  • 如何避免在 Python 中打印素数序列中的奇数?
    如何避免在 Python 中打印素数序列中的奇数?
    如何在 Python 中打印素数序列许多程序员都在努力创建一个在 Python 中准确打印素数的函数。一个常见的问题是打印奇数列表。要解决这个问题,必须彻底了解素数属性并修改代码。素数只能被 1 和它们本身整除。因此,改进的代码检查从 2 到数字的平方根(如果数字较小,则为数字本身)范围内的整除性。...
    编程 发布于2024-11-07
  • 如何在 Pygame 中向鼠标方向发射子弹?
    如何在 Pygame 中向鼠标方向发射子弹?
    如何在 Pygame 中朝鼠标方向发射子弹在 Pygame 中,可以创建一颗朝鼠标方向发射的子弹。为此,需要创建一个代表子弹的类,并根据鼠标位置设置其初始位置和方向。子弹的类首先,为项目符号创建一个类。该类应包含子弹的位置、大小和表面的属性。表面就是将在屏幕上渲染的内容。import pygame ...
    编程 发布于2024-11-07
  • 优化性能的 GG 编码技巧:加快代码速度
    优化性能的 GG 编码技巧:加快代码速度
    在软件开发领域,优化代码性能对于交付用户喜爱的快速响应的应用程序至关重要。无论您从事前端还是后端工作,学习如何编写高效的代码都是至关重要的。在本文中,我们将探讨各种性能优化技术,例如降低时间复杂度、缓存、延迟加载和并行性。我们还将深入探讨如何分析和优化前端和后端代码。让我们开始提高代码的速度和效率!...
    编程 发布于2024-11-07
  • 如何使用 PHP 的 strtotime() 函数查找一周中特定一天的日期?
    如何使用 PHP 的 strtotime() 函数查找一周中特定一天的日期?
    确定一周中指定日期(星期一、星期二等)的日期如果您需要确定日期戳一周中的特定一天,例如星期一、星期二或任何其他工作日,可以使用 strtotime() 函数。当指定日期在本周内尚未出现时,此函数特别有用。例如,要获取下周二的日期戳,只需使用以下代码:strtotime('next tuesday')...
    编程 发布于2024-11-07
  • 使用 Socket.io 和 Redis 构建和部署聊天应用程序。
    使用 Socket.io 和 Redis 构建和部署聊天应用程序。
    在本教程中,我们将使用 Web 套接字构建一个聊天应用程序。当您想要构建需要实时传输数据的应用程序时,Web 套接字非常有用。 在本教程结束时,您将能够设置自己的套接字服务器、实时发送和接收消息、在 Redis 中存储数据以及在渲染和 google cloud run 上部署您的应用程序。 ...
    编程 发布于2024-11-07
  • SQL 连接内部
    SQL 连接内部
    SQL 连接是查询数据库的基础,它允许用户根据指定条件组合多个表中的数据。连接分为两种主要类型:逻辑连接和物理连接。逻辑联接代表组合表中数据的概念方式,而物理联接是指这些联接在数据库系统(例如 RDS(关系数据库服务)或其他 SQL 服务器)中的实际实现。在今天的博文中,我们将揭开 SQL 连接的神...
    编程 发布于2024-11-07
  • 你应该知道的 Javascript 特性
    你应该知道的 Javascript 特性
    在本文中,我们将探讨如何在尝试访问可能是未定义或 null 的数据时防止错误,并且我们将介绍您可以使用的方法用于在必要时有效管理数据。 通过可选链接进行安全访问 在 JavaScript 中,当尝试访问嵌套对象中的值或函数时,如果结果为 undefined,您的代码可能会引发错误。此...
    编程 发布于2024-11-07
  • JavaScript 中的 Promise:理解、处理和掌握异步代码
    JavaScript 中的 Promise:理解、处理和掌握异步代码
    简介 我曾经是一名 Java 开发人员,我记得第一次接触 JavaScript 中的 Promise 时。尽管这个概念看起来很简单,但我仍然无法完全理解 Promise 是如何工作的。当我开始在项目中使用它们并了解它们解决的案例时,情况发生了变化。然后灵光乍现的时刻到来了,一切都变...
    编程 发布于2024-11-07
  • 如何将密钥集成到 Java Spring Boot 中
    如何将密钥集成到 Java Spring Boot 中
    Java Spring Boot 中的密钥简介 密钥提供了一种现代、安全的方式来验证用户身份,而无需依赖传统密码。在本指南中,我们将引导您使用 Thymeleaf 作为模板引擎将密钥集成到 Java Spring Boot 应用程序中。 我们将利用 Corbado 的密钥优先 UI...
    编程 发布于2024-11-07

免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。

Copyright© 2022 湘ICP备2022001581号-3