Spring Boot 3 与 RocketMQ 整合,可以通过 Spring Messaging 结合 RocketMQ 的 rocketmq-spring-boot-starter
实现。在这个整合过程中,RocketMQ 作为消息队列系统,Spring Boot 负责提供应用框架,整合可以让开发者更加便捷地使用 RocketMQ 的生产和消费功能。今天就先介绍下SpringBoot3整合RocketMQ5.x,并给出常见消息类型代码示例。
环境准备
- Spring Boot 3.x 项目
- RocketMQ 服务器:版本V5.3,包括
NameServer
和Broker
,可以本地搭建或者使用云服务,搭建部分后面单独出教程。 - RocketMQ 依赖:Spring Boot 与 RocketMQ 的整合依赖
rocketmq-spring-boot-starter
。
1. 配置项目依赖
在 Spring Boot 项目的 pom.xml
中添加 RocketMQ 相关依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version> <!-- 或选择最新稳定版本 -->
</dependency>
2. 配置 RocketMQ 信息
在 application.yml
文件中配置 RocketMQ 的相关连接信息,包括 name-server
和其他基础配置。
2.1配置文件
rocketmq-spring-boot-starter 2.2.0(不含)以下版本
spring:
rocketmq:
name-server: localhost:9876 # NameServer 地址,集群使用';'隔开
producer:
group: springboot-producer-group # 生产者组名称
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-next-server: true
access-key: RocketMQ # 若启用了 ACL 功能
secret-key: 12345678 # 若启用了 ACL 功能
consumer:
group: springboot-consumer-group # 消费者组名称
topic: test-topic # 订阅的主题
access-key: RocketMQ # 若启用了 ACL 功能
secret-key: 12345678 # 若启用了 ACL 功能
rocketmq-spring-boot-starter 2.2.0及其以上版本:
rocketmq:
name-server: localhost:9876 # NameServer 地址,集群使用';'隔开
producer:
group: springboot-producer-group # 生产者组名称
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-next-server: true
access-key: RocketMQ # 若启用了 ACL 功能
secret-key: 12345678 # 若启用了 ACL 功能
consumer:
group: springboot-consumer-group # 消费者组名称
topic: test-topic # 订阅的主题
access-key: RocketMQ # 若启用了 ACL 功能
secret-key: 12345678 # 若启用了 ACL 功能
2.2导入自动配置类
按照之前介绍的自动配置,想让 RocketMQ 配生效,需要在启动类上添加如下代码或单独写个配置类:
@Import(RocketMQAutoConfiguration.class)
否在会报错:A component required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
@SpringBootApplication
@Import(RocketMQAutoConfiguration.class)
public class SpringBoot308RocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBoot308RocketmqApplication.class, args);
}
}
2.3创建Topic
示例代码仅一本地一个服务,即一个生产者和消费者,只需选一个broker,否在有些消息将无法消费。
3. 生产者代码示例
在 Spring Boot 项目中创建一个生产者服务,可以作为工具类,使用 RocketMQ 发送消息。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class RocketMQProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// 发送简单消息
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
System.out.println("Message sent: " + message);
}
}
3.1同步消息
同步发送消息是指,Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。RocketMQ 同步消息的方法形如 syncXx()。
/**
* 同步类型消息
*
* @param topic
* @param message
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.syncSend(topic, message);
System.out.println("Message sent: " + message);
}
3.2 异步消息
异步发送消息是指,Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。RocketMQ 同步消息的方法形如 asyncXx()。
/**
* 异步类型消息
*
* @param topic
* @param message
*/
public void asyncSendMessage(String topic, String message) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Async message sent: " + message);
}
@Override
public void onException(Throwable e) {
System.out.println("Async message error: " + e);
}
});
System.out.println("Message sent: " + message);
}
3.3 单向消息
单向发送消息是指,Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK。该发送方式时 MQ 也不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。
/**
* 发送单向消息
*
* @param topic
* @param message
*/
public void sendOneWayMessage(String topic, String message) {
rocketMQTemplate.sendOneWay(topic, message);
System.out.println("One way message sent: " + message);
}
3.4顺序消息
顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。
/**
* 发送顺序消息
*/
public void sendOrderlyMessage(String topic, String message, String shardingKey) {
for (int i = 0; i < 10; i++) {
String orderlyMessage = message + i;
rocketMQTemplate.syncSendOrderly(topic, orderlyMessage, shardingKey);
System.out.println("Orderly message sent: " + orderlyMessage + " with shardingKey: " + shardingKey);
}
}
3.5延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级默认有18个,可以在broker.conf中增加配置,然后重启broker:
# 延时等级
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
代码很简单:
/**
* 发送延迟消息
*
* @param topic
* @param message
* @param delayLevel
*/
public void sendDelayedMessage(String topic, String message, int delayLevel) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 3000, delayLevel);
System.out.println("Delayed message sent: " + message + " with delayLevel: " + delayLevel);
}
除此之外,RocketMQ 还支持事务消息、批量消息、消息过滤等,后面再详细介绍。
4. 消费者代码示例
使用 @RocketMQMessageListener
注解来订阅主题并监听消息的到达,处理消息的消费逻辑。
package com.example.boot308rocketmq;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @author CoderJia
* @create 2024/09/09 15:12
* @Description
**/
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "springboot-consumer-group")
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理接收到的消息
System.out.println("Received message: " + message);
}
}
5. 调用生产者发送消息
为了便于测试,创建一个简单的 Spring Boot Controller层代码,用于调用生产者发送消息。
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.boot308rocketmq.controller;
import com.example.boot308rocketmq.RocketMQProducer;
import jakarta.annotation.Resource;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
* @author CoderJia
* @create 2024/9/9 下午 15:08
* @Description
**/
@Controller
public class MessageController {
@Resource
private RocketMQProducer rocketMQProducer;
@GetMapping("/sendMessage")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
rocketMQProducer.sendOneWayMessage("test-topic", message);
return ResponseEntity.ok("Message sent: " + message);
}
@GetMapping("/sendOrderlyMessage")
public ResponseEntity<String> sendOrderlyMessage(@RequestParam String message) {
rocketMQProducer.sendOrderlyMessage("test-topic", message, "orderKey");
return ResponseEntity.ok("Message sent: " + message);
}
@GetMapping("/sendDelayedMessage")
public ResponseEntity<String> sendDelayedMessage(@RequestParam String message, @RequestParam int delayLevel) {
rocketMQProducer.sendDelayedMessage("test-topic", message, delayLevel);
return ResponseEntity.ok("Delayed message sent: " + message + " with delayLevel: " + delayLevel);
}
}
6. 启动项目并验证
- 启动 RocketMQ 的
NameServer
和Broker
。 - 启动 Spring Boot 项目。
- 打开浏览器或者使用 Postman 访问发送消息的接口:
普通消息:
http://localhost:8080/sendMessage?message=HelloRocketMQ
顺序消息:
http://localhost:8080/sendOrderlyMessage?message=HelloRocketMQ
延迟消息:
http://localhost:8080/sendDelayedMessage?message=HelloDelayedRocketMQ&delayLevel=3
7. 整合总结
- 生产者:通过
RocketMQTemplate
提供了发送消息的方法,包括同步消息、异步消息、顺序消息、延迟消息等。 - 消费者:使用
@RocketMQMessageListener
注解,能够便捷地监听指定主题并消费消息。 - 事务消息:RocketMQ 还支持事务消息,适合实现两阶段提交的事务模型,后面会着重介绍。
这种整合方式在 Spring Boot 3 中非常自然,并且 rocketmq-spring-boot-starter
进一步简化了配置和集成,使得开发者可以专注于业务逻辑的实现。