Springboot项目集成RocketMQ
本文主要讲述Springboot项目如何集成RocketMQ。
1、RocketMQ安装及启动
RocketMQ安装步骤详见地址:
https://whwtree.com/archives/rocketmq-install.html
2、启动RocketMQ服务
// 启动NameServer
cd D:\tools\rocketmq-all-5.0.0-bin-release
start bin\mqnamesrv.cmd
启动后显示:The Name Server boot success. 则表示NameServer启动成功。
// 启动Broker+Proxy
start bin\mqbroker.cmd -n localhost:9876
启动后显示:The broker[LAPTOP-NH51ES2C, 192.168.31.95:10911] boot success. 则表示broker启动成功。
3、创建Springboot项目并添加RocketMQ配置及相关类
(1)pom.xml引入RocketMQ依赖,具体内容如下:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
(2)application.properties配置文件新增rocketmq配置,具体内容如下:
server.port=8081
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=test1
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.compress-message-body-threshold=4096
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-times-when-send-failed=3
rocketmq.producer.retry-next-server=true
rocketmq.producer.retry-times-when-send-async-failed=3
(3)创建RocketMQ消息生产者Controller类:RocketMQController.java,具体内容如下:
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/send")
public String send(String message) {
rocketMQTemplate.convertAndSend("test", message);
return "send ok";
}
@RequestMapping("/syncSend")
public String syncSend(String message) {
rocketMQTemplate.syncSend("test", message);
return "syncSend ok";
}
@RequestMapping("/asyncSend")
public String asyncSend(String message) {
rocketMQTemplate.asyncSend("test", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功..." + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败..." + throwable.getMessage());
}
});
return "asyncSend ok";
}
}
(4)创建RocketMQ消费者类:RocketMQConsumer.java,具体内容如下:
@Component
@RocketMQMessageListener(consumerGroup = "group01", topic = "test")
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费消息:" + s);
}
}
4、创建主题测试RocketMQ消息发送与接收
创建test主题:
bin\mqadmin.cmd updateTopic -b localhost:10911 -n localhost:9876 -t test
启动Springboot工程,访问生产者相关接口测试RocketMQ消息:
(1)发送普通消息接口:
http://localhost:8081/rocketmq/send?message=001
控制台日志打印:
消费消息:001
(2)发送同步消息接口:
http://localhost:8081/rocketmq/syncSend?message=002
控制台日志打印:
消费消息:002
(3)发送异步消息接口(带回调方法):
http://localhost:8081/rocketmq/asyncSend?message=003
控制台日志打印:
消费消息:003
(完)