Fügen Sie zunächst die erforderlichen Abhängigkeiten zu Ihrer pom.xml-Datei hinzu:
org.apache.rocketmq rocketmq-spring-boot-starter 2.3.1 org.apache.rocketmq rocketmq-client org.apache.rocketmq rocketmq-client 5.3.0
Konfigurieren Sie Ihre RocketMQ-Einstellungen in der Datei „bootstrap.yaml“:
rocketmq: name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses consumer: group: consume-group-test access-key: access # Configure if ACL is used secret-key: secret consume-message-batch-max-size: 50 # Max messages per batch pull-batch-size: 100 # Max messages pulled from Broker topics: project: "group-topic-1" groups: project: "consume-group-1" # Use different groups for different business processes
Erstellen Sie die Konfigurationsklasse MqConfigProperties:
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import lombok.Data; import java.io.Serializable; /** * RocketMQ Configuration Class */ @Data @Component @ConfigurationProperties(prefix = "rocketmq") public class MqConfigProperties implements Serializable { private static final long serialVersionUID = 1L; @Autowired private RocketMQProperties rocketMQProperties; private TopicProperties topics; private GroupProperties groups; /** * Topic Configuration Class */ @Data public static class TopicProperties implements Serializable { private static final long serialVersionUID = 1L; private String project; } /** * Consumer Group Configuration Class */ @Data public static class GroupProperties implements Serializable { private static final long serialVersionUID = 1L; private String project; } }
Erstellen Sie die Verbraucherklasse UserConsumer:
import com.alibaba.fastjson2.JSONObject; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.context.ApplicationContext; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; import java.util.List; /** * Batch Consumer Implementation */ @Component @Slf4j public class UserConsumer implements SmartLifecycle { @Resource private MqConfigProperties mqConfigProperties; @Resource private ApplicationContext applicationContext; private volatile boolean running; private DefaultMQPushConsumer consumer; @Override public void start() { if (isRunning()) { throw new IllegalStateException("Consumer is already running"); } initConsumer(); setRunning(true); log.info("UserConsumer started successfully."); } @Override public void stop() { if (isRunning() && consumer != null) { consumer.shutdown(); setRunning(false); log.info("UserConsumer stopped."); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } private void initConsumer() { String topic = mqConfigProperties.getTopics().getProject(); String group = mqConfigProperties.getGroups().getProject(); String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer(); String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey(); String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey(); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey); consumer = rpcHook != null ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely()) : new DefaultMQPushConsumer(group); consumer.setNamesrvAddr(nameServer); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(100); // Set the batch size for consumption consumer.subscribe(topic, "*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { log.info("Received {} messages", msgs.size()); for (MessageExt message : msgs) { String body = new String(message.getBody()); log.info("Processing message: {}", body); User user = JSONObject.parseObject(body, User.class); processUser(user); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group); } private void processUser(User user) { log.info("Processing user with ID: {}", user.getId()); // Handle user-related business logic } }
Um Batch-Nachrichten zu erstellen, können Sie die folgende UserProducer-Klasse verwenden:
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; public class UserProducer { private DefaultMQProducer producer; public void sendBatchMessages(Listusers, String topic) { List messages = new ArrayList(); for (User user : users) { messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes())); } try { producer.send(messages); } catch (Exception e) { log.error("Error sending batch messages", e); } } }
Leistungsoptimierung: Sie können die Größe des Consumer-Thread-Pools anpassen. Standardmäßig ist es auf „consumerThreadMin=20“ und „consumemThreadMax=20“ eingestellt. In Szenarien mit hoher Parallelität kann die Erhöhung der Thread-Poolgröße die Leistung verbessern.
Fehlerbehandlung: Wenn die Nutzung fehlschlägt, seien Sie bei RECONSUME_LATER vorsichtig, um endlose Wiederholungsschleifen zu vermeiden. Legen Sie eine maximale Anzahl von Wiederholungsversuchen basierend auf Ihren Geschäftsanforderungen fest.
Mandantenisolierung: Verwenden Sie unterschiedliche Gruppen für unterschiedliche Geschäftsmodule, um zu vermeiden, dass Daten aus der falschen Gruppe verbraucht werden. Dies ist besonders in Produktionsumgebungen von entscheidender Bedeutung.
Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.
Copyright© 2022 湘ICP备2022001581号-3