「労働者が自分の仕事をうまくやりたいなら、まず自分の道具を研ぎ澄まさなければなりません。」 - 孔子、「論語。陸霊公」
表紙 > プログラミング > Spring Boot で RocketMQ を使用してバッチ メッセージ消費を実装する方法

Spring Boot で RocketMQ を使用してバッチ メッセージ消費を実装する方法

2024 年 11 月 7 日に公開
ブラウズ:446

How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1. 依存関係の追加

まず、必要な依存関係を pom.xml ファイルに追加します。


org.apache.rocketmqrocketmq-spring-boot-starter2.3.1org.apache.rocketmqrocketmq-clientorg.apache.rocketmqrocketmq-client5.3.0

2. 設定ファイル bootstrap.yaml

bootstrap.yaml ファイルで RocketMQ 設定を構成します:

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
  consumer:
    group: consume-group-test
    access-key: access # Configure if ACL is used
    secret-key: secret
    consume-message-batch-max-size: 50  # Max messages per batch
    pull-batch-size: 100  # Max messages pulled from Broker
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # Use different groups for different business processes

3. 構成クラス MqConfigProperties

構成クラス MqConfigProperties を作成します:

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

/**
 * RocketMQ Configuration Class
 */
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    /**
     * Topic Configuration Class
     */
    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    /**
     * Consumer Group Configuration Class
     */
    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}

4. 消費者規範の実施

コンシューマー クラス UserConsumer:
を作成します。

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

/**
 * Batch Consumer Implementation
 */
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("Consumer is already running");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer started successfully.");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer stopped.");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // Set the batch size for consumption
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                log.info("Received {} messages", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("Processing message: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
    }

    private void processUser(User user) {
        log.info("Processing user with ID: {}", user.getId());
        // Handle user-related business logic
    }
}

5. プロデューサーのサンプルコード

バッチ メッセージを生成するには、次の UserProducer クラスを使用できます:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List users, String topic) {
        List messages = new ArrayList();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("Error sending batch messages", e);
        }
    }
}

6. 追加の最適化に関する提案

  • パフォーマンスの最適化: コンシューマ スレッド プールのサイズを調整できます。デフォルトでは、consumptionThreadMin=20 および ConsumerThreadMax=20 に設定されています。同時実行性の高いシナリオでは、スレッド プールのサイズを増やすとパフォーマンスが向上します。

  • エラー処理: 消費が失敗した場合は、無限再試行ループを避けるために RECONSUME_LATER に注意してください。ビジネス要件に基づいて最大再試行回数を設定します。

  • テナント分離: 間違ったグループからのデータの消費を避けるために、異なるビジネス モジュールに異なるグループを使用します。これは実稼働環境では特に重要です。

リリースステートメント この記事は次の場所に転載されています: https://dev.to/wilson_evan_1efa5910f8855/how-to-implement-batch-message-consumption-with-rocketmq-in-spring-boot-6gi?1 侵害がある場合は、study_golang にご連絡ください。 @163.com 削除
最新のチュートリアル もっと>
  • C++ で std::locale を使用して数値をカンマでフォーマットする方法
    C++ で std::locale を使用して数値をカンマでフォーマットする方法
    C でのカンマを使用した数値の書式設定 C では、 std::locale クラスは、カンマを使用して数値を書式設定するロケール依存の方法を提供します。 .std::locale with std::stringstream数値をカンマ付きの文字列としてフォーマットするには、std::locale ...
    プログラミング 2024 年 11 月 7 日に公開
  • Python で素数シーケンス内の奇数の出力を回避するには?
    Python で素数シーケンス内の奇数の出力を回避するには?
    Python で一連の素数を出力する方法多くのプログラマは、Python で素数を正確に出力する関数を作成するのに苦労しています。よくある問題の 1 つは、代わりに奇数のリストを出力することです。この問題を修正するには、素数のプロパティを完全に理解し、コードを変更することが不可欠です。素数は 1 と...
    プログラミング 2024 年 11 月 7 日に公開
  • Pygameでマウスの方向に弾丸を発射するにはどうすればよいですか?
    Pygameでマウスの方向に弾丸を発射するにはどうすればよいですか?
    Pygame でマウスの方向に弾丸を発射する方法Pygame では、マウスの方向に発射される弾丸を作成できます。これを行うには、弾丸を表すクラスを作成し、マウスの位置に基づいてその初期位置と方向を設定する必要があります。弾丸のクラスまず、弾丸のクラスを作成します。このクラスには、弾丸の位置、サイズ、...
    プログラミング 2024 年 11 月 7 日に公開
  • パフォーマンスを最適化するための GG コーディングのヒント: コードの高速化
    パフォーマンスを最適化するための GG コーディングのヒント: コードの高速化
    ソフトウェア開発の世界では、ユーザーが好む高速で応答性の高いアプリケーションを提供するには、コードのパフォーマンスを最適化することが重要です。フロントエンドで作業しているかバックエンドで作業しているかに関係なく、効率的なコードの書き方を学ぶことが不可欠です。この記事では、時間の複雑さの軽減、キャッシ...
    プログラミング 2024 年 11 月 7 日に公開
  • PHP の strtotime() 関数を使用して特定の曜日の日付を見つけるにはどうすればよいですか?
    PHP の strtotime() 関数を使用して特定の曜日の日付を見つけるにはどうすればよいですか?
    特定の曜日(月曜日、火曜日など)の日付を決定する日付スタンプを確認する必要がある場合月曜日、火曜日、その他の平日など、特定の曜日には strtotime() 関数を使用できます。この関数は、今週中に指定された日がまだ発生していない場合に特に便利です。たとえば、次の火曜日の日付スタンプを取得するには、...
    プログラミング 2024 年 11 月 7 日に公開
  • Socket.io と Redis を使用してチャット アプリケーションを構築し、デプロイします。
    Socket.io と Redis を使用してチャット アプリケーションを構築し、デプロイします。
    このチュートリアルでは、Web ソケットを使用してチャット アプリケーションを構築します。 Web ソケットは、リアルタイムのデータ転送を必要とするアプリケーションを構築する場合に非常に役立ちます。 このチュートリアルを終えると、独自のソケット サーバーをセットアップし、リアルタイムでメッセージを送...
    プログラミング 2024 年 11 月 7 日に公開
  • 内部 SQL 結合
    内部 SQL 結合
    SQL 結合はデータベースのクエリの基本であり、ユーザーは指定された条件に基づいて複数のテーブルのデータを結合できます。結合は、論理結合と物理結合の 2 つの主なタイプに分類されます。論理結合はテーブルのデータを組み合わせる概念的な方法を表し、物理結合は RDS (リレーショナル データベース サー...
    プログラミング 2024 年 11 月 7 日に公開
  • 知っておくべきJavaScriptの機能
    知っておくべきJavaScriptの機能
    この記事では、未定義または null の可能性があるデータにアクセスしようとするときにエラーを防ぐ方法を検討し、できる方法を見ていきます。 必要に応じてデータを効果的に管理するために使用します. オプションのチェーンによる安全なアクセス JavaScript で、入れ子になったオブジ...
    プログラミング 2024 年 11 月 7 日に公開
  • JavaScript の約束: 非同期コードの理解、処理、および習得
    JavaScript の約束: 非同期コードの理解、処理、および習得
    イントロ 私は Java 開発者として働いていましたが、JavaScript の Promise に初めて触れたときのことを覚えています。コンセプトは単純そうに見えましたが、Promise がどのように機能するのかを完全に理解することはできませんでした。プロジェクトでそれらを使用し...
    プログラミング 2024 年 11 月 7 日に公開
  • パスキーを Java Spring Boot に統合する方法
    パスキーを Java Spring Boot に統合する方法
    Java Spring Boot のパスキーの概要 パスキーは、従来のパスワードに依存せずにユーザーを認証する最新の安全な方法を提供します。このガイドでは、Thymeleaf をテンプレート エンジンとして使用して、Java Spring Boot アプリケーションにパスキーを統合...
    プログラミング 2024 年 11 月 7 日に公開
  • グアテマラの前環境大臣としてのマリオ・ロベルト・ロハス・エスピノの影響
    グアテマラの前環境大臣としてのマリオ・ロベルト・ロハス・エスピノの影響
    マリオ・ロベルト・ロハス・エスピノはグアテマラの元環境大臣として、国の持続可能な発展に貢献した環境政策の実施において重要な役割を果たしました。同省長官としての彼の経営は、特に環境立法や保全プロジェクトの面で重要な遺産を残した。この記事では、彼の影響力と、任期中に彼が推進した主な政策について探ります。...
    プログラミング 2024 年 11 月 7 日に公開
  • データ収集のためにクラスのすべてのインスタンスを追跡してアクセスするにはどうすればよいですか?
    データ収集のためにクラスのすべてのインスタンスを追跡してアクセスするにはどうすればよいですか?
    データ収集のためのクラス インスタンスの追跡プログラムの終わりに近づいており、複数の変数から特定の変数を抽出する必要があると想像してください。クラスのインスタンスを使用して辞書を作成します。このタスクは、集約または分析する必要がある重要なデータを保持するオブジェクトを操作するときに発生することがあり...
    プログラミング 2024 年 11 月 7 日に公開
  • PHP 連想配列内で検索する方法 – 簡単なヒント
    PHP 連想配列内で検索する方法 – 簡単なヒント
    連想配列は PHP の基本的なデータ構造であり、開発者はキーと値のペアを保存できます。これらは多用途であり、構造化データを表すためによく使用されます。 PHP 連想配列内の特定の要素を検索するのは一般的なタスクです。ただし、PHP で使用できるほとんどのネイティブ関数は、単純な配列でもうまく機能しま...
    プログラミング 2024 年 11 月 7 日に公開
  • Web 開発の未来: すべての開発者が知っておくべき新たなトレンドとテクノロジー
    Web 開発の未来: すべての開発者が知っておくべき新たなトレンドとテクノロジー
    導入 Web 開発は、初期の静的な HTML ページとシンプルな CSS デザインから大きく進歩しました。技術の進歩と、よりダイナミックでインタラクティブで応答性の高い Web サイトに対するユーザーの需要の高まりにより、この分野は長年にわたって急速に進化してきました。インターネッ...
    プログラミング 2024 年 11 月 7 日に公開
  • ays 初心者の Python コード者は ChatGPT を使用できます
    ays 初心者の Python コード者は ChatGPT を使用できます
    初心者の Python 開発者は、きれいなコードの作成からエラーのトラブルシューティングまで、数え切れないほどの課題に直面します。 ChatGPT は、生産性を向上させ、コーディング作業を合理化するための秘密兵器となります。際限なくドキュメントやフォーラムを調べる代わりに、ChatGPT に直接質...
    プログラミング 2024 年 11 月 7 日に公開

免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。

Copyright© 2022 湘ICP备2022001581号-3