」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > Spring Boot中如何使用RocketMQ實現大量訊息消費

Spring Boot中如何使用RocketMQ實現大量訊息消費

發佈於2024-11-07
瀏覽:716

How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1.添加依賴

首先,將必要的依賴項新增至您的 pom.xml 檔案:



    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.3.1
    
        
            org.apache.rocketmq
            rocketmq-client
        
    




    org.apache.rocketmq
    rocketmq-client
    5.3.0

2.設定檔bootstrap.yaml

在 bootstrap.yaml 檔案中配置您的 RocketMQ 設定:

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
  consumer:
    group: consume-group-test
    access-key: access # Configure if ACL is used
    secret-key: secret
    consume-message-batch-max-size: 50  # Max messages per batch
    pull-batch-size: 100  # Max messages pulled from Broker
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # Use different groups for different business processes

3.配置類別MqConfigProperties

建立設定類別MqConfigProperties:

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

/**
 * RocketMQ Configuration Class
 */
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    /**
     * Topic Configuration Class
     */
    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    /**
     * Consumer Group Configuration Class
     */
    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}

4. 實施消費者守則

建立消費者類別UserConsumer:

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

/**
 * Batch Consumer Implementation
 */
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("Consumer is already running");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer started successfully.");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer stopped.");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // Set the batch size for consumption
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                log.info("Received {} messages", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("Processing message: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
    }

    private void processUser(User user) {
        log.info("Processing user with ID: {}", user.getId());
        // Handle user-related business logic
    }
}

5. 生產者範例程式碼

要產生批次訊息,您可以使用以下 UserProducer 類別:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List users, String topic) {
        List messages = new ArrayList();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("Error sending batch messages", e);
        }
    }
}

6. 額外的優化建議

  • 效能最佳化:可以調整消費者執行緒池的大小。預設情況下,它設定為consumeThreadMin = 20和consumeThreadMax = 20。高並發場景下,增大執行緒池大小可以提升效能。

  • 錯誤處理:消費失敗時,謹慎使用RECONSUME_LATER,避免無限重試循環。根據您的業務需求設定最大重試次數。

  • 租戶隔離:不同的業務模組使用不同的群組,避免消費錯誤群組的資料。這在生產環境中尤其重要。

版本聲明 本文轉載於:https://dev.to/wilson_evan_1efa5910f8855/how-to-implement-batch-message-consumption-with-rocketmq-in-spring-boot-6gi?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 大批
    大批
    [2 數組是對象,因此它們在JS中也具有方法。 切片(開始):在新數組中提取部分數組,而無需突變原始數組。 令ARR = ['a','b','c','d','e']; // USECASE:提取直到索引作...
    程式設計 發佈於2025-02-19
  • 如何為PostgreSQL中的每個唯一標識符有效地檢索最後一行?
    如何為PostgreSQL中的每個唯一標識符有效地檢索最後一行?
    [2最後一行與數據集中的每個不同標識符關聯。考慮以下數據: 1 2014-02-01 kjkj 1 2014-03-11 ajskj 3 2014-02-01 sfdg 3 2014-06-12 fdsa 為了檢索數據集中每個唯一ID的最後一行信息,您可以在操作員上使用Postgres的有效效...
    程式設計 發佈於2025-02-19
  • 如何使用替換指令在GO MOD中解析模塊路徑差異?
    如何使用替換指令在GO MOD中解析模塊路徑差異?
    克服go mod中的模塊路徑差異 github.com/coreos/etcd/integration imports :解析GO.mod:模塊將其路徑聲明為: go.etcd.io/bbolt [&&&&&&&&&&&&&&&&&&&&&&&&&&&& github.com/coreos/b...
    程式設計 發佈於2025-02-19
  • 在沒有密碼提示的情況下,如何在Ubuntu上安裝MySQL?
    在沒有密碼提示的情況下,如何在Ubuntu上安裝MySQL?
    在ubuntu 使用debconf-set-selections 在安裝過程中避免密碼提示mysql root用戶。這需要以下步驟: sudo debconf-set-selections
    程式設計 發佈於2025-02-19
  • 如何檢查對像是否具有Python中的特定屬性?
    如何檢查對像是否具有Python中的特定屬性?
    方法來確定對象屬性存在尋求一種方法來驗證對像中特定屬性的存在。考慮以下示例,其中嘗試訪問不確定屬性會引起錯誤: >>> a = someClass() >>> A.property Trackback(最近的最新電話): 文件“ ”,第1行, AttributeError:SomeClass實...
    程式設計 發佈於2025-02-19
  • 如何以不同的頻率控制Android設備振動?
    如何以不同的頻率控制Android設備振動?
    控制使用頻率變化的Android設備振動是否想為您的Android應用程序添加觸覺元素?了解如何觸發設備的振動器至關重要。您可以做到這一點:生成基本振動以生成簡單的振動,使用振動器對象:這將導致設備在指定的持續時間內振動。 許可要求通過上述技術,您可以創建在您的Android應用程序中自定義振動,以...
    程式設計 發佈於2025-02-19
  • 如何克服PHP的功能重新定義限制?
    如何克服PHP的功能重新定義限制?
    克服PHP的函數重新定義限制在PHP中,多次定義一個相同名稱的函數是一個no-no。嘗試這樣做,如提供的代碼段所示,將導致可怕的“不能重新列出”錯誤。 //錯誤:“ cance redeclare foo()” 但是,PHP工具腰帶中有一個隱藏的寶石:runkit擴展。它使您能夠靈活地重新定...
    程式設計 發佈於2025-02-19
  • 版本5.6.5之前,使用current_timestamp與時間戳列的current_timestamp與時間戳列有什麼限制?
    版本5.6.5之前,使用current_timestamp與時間戳列的current_timestamp與時間戳列有什麼限制?
    在默認值中使用current_timestamp或mysql版本中的current_timestamp或在5.6.5 這種限制源於遺產實現的關注,這些限制需要為Current_timestamp功能提供特定的實現。消息和相關問題 current_timestamp值: 創建表`foo`( `...
    程式設計 發佈於2025-02-19
  • 如何在JavaScript對像中動態設置鍵?
    如何在JavaScript對像中動態設置鍵?
    如何為JavaScript對像變量創建動態鍵,嘗試為JavaScript對象創建動態鍵,使用此Syntax jsObj['key' i] = 'example' 1;將不起作用。正確的方法採用方括號:他們維持一個長度屬性,該屬性反映了數字屬性(索引)和一個數字屬性的數量。標準對像沒有模仿這...
    程式設計 發佈於2025-02-19
  • 如何使用組在MySQL中旋轉數據?
    如何使用組在MySQL中旋轉數據?
    在關係數據庫中使用mysql組使用mysql組來調整查詢結果。在這裡,我們面對一個共同的挑戰:使用組的組將數據從基於行的基於列的基於列的轉換。通過子句以及條件匯總函數,例如總和或情況。讓我們考慮以下查詢: select d.data_timestamp, sum(data_id = 1 tata...
    程式設計 發佈於2025-02-19
  • 可以在純CS中將多個粘性元素彼此堆疊在一起嗎?
    可以在純CS中將多個粘性元素彼此堆疊在一起嗎?
    https://webthemez.com/demo/sticky-multi-header-scroll/index.html </main> <section> display:grid; grid-template-col...
    程式設計 發佈於2025-02-19
  • HTML格式標籤
    HTML格式標籤
    HTML 格式化元素 **HTML Formatting is a process of formatting text for better look and feel. HTML provides us ability to format text without us...
    程式設計 發佈於2025-02-19
  • 如何修復\“常規錯誤:2006 MySQL Server在插入數據時已經消失\”?
    如何修復\“常規錯誤:2006 MySQL Server在插入數據時已經消失\”?
    How to Resolve "General error: 2006 MySQL server has gone away" While Inserting RecordsIntroduction: connect to to to Database connect to t...
    程式設計 發佈於2025-02-19
  • 如何限制動態大小的父元素中元素的滾動範圍?
    如何限制動態大小的父元素中元素的滾動範圍?
    在交互式界面中實現垂直滾動元素的CSS高度限制 考慮一個佈局,其中我們具有與可滾動的映射div一起移動的subollable map div用戶的垂直滾動,同時保持其與固定側邊欄的對齊方式。但是,地圖的滾動無限期擴展,超過了視口的高度,阻止用戶訪問頁面頁腳。 可以限制地圖的滾動,我們可以利用CS...
    程式設計 發佈於2025-02-19
  • 為什麼使用Firefox後退按鈕時JavaScript執行停止?
    為什麼使用Firefox後退按鈕時JavaScript執行停止?
    導航歷史記錄問題:JavaScript使用Firefox Back Back 此行為是由瀏覽器緩存JavaScript資源引起的。要解決此問題並確保在後續頁面訪問中執行腳本,Firefox用戶應設置一個空功能以在window.onunload事件上調用。 pre> window.onload...
    程式設計 發佈於2025-02-19

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3