Springboot项目集成RocketMQ

本文主要讲述Springboot项目如何集成RocketMQ。

1、RocketMQ安装及启动

RocketMQ安装步骤详见地址:
https://whwtree.com/archives/rocketmq-install.html

2、启动RocketMQ服务

// 启动NameServer
cd D:\tools\rocketmq-all-5.0.0-bin-release
start bin\mqnamesrv.cmd

启动后显示:The Name Server boot success. 则表示NameServer启动成功。

// 启动Broker+Proxy
start bin\mqbroker.cmd -n localhost:9876

启动后显示:The broker[LAPTOP-NH51ES2C, 192.168.31.95:10911] boot success. 则表示broker启动成功。

3、创建Springboot项目并添加RocketMQ配置及相关类

(1)pom.xml引入RocketMQ依赖,具体内容如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

(2)application.properties配置文件新增rocketmq配置,具体内容如下:

server.port=8081
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=test1
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.compress-message-body-threshold=4096
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-times-when-send-failed=3
rocketmq.producer.retry-next-server=true
rocketmq.producer.retry-times-when-send-async-failed=3

(3)创建RocketMQ消息生产者Controller类:RocketMQController.java,具体内容如下:

@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @RequestMapping("/send")
    public String send(String message) {
        rocketMQTemplate.convertAndSend("test", message);
        return "send ok";
    }

    @RequestMapping("/syncSend")
    public String syncSend(String message) {
        rocketMQTemplate.syncSend("test", message);
        return "syncSend ok";
    }

    @RequestMapping("/asyncSend")
    public String asyncSend(String message) {
        rocketMQTemplate.asyncSend("test", message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功..." + sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败..." + throwable.getMessage());
            }
        });
        return "asyncSend ok";
    }
}

(4)创建RocketMQ消费者类:RocketMQConsumer.java,具体内容如下:

@Component
@RocketMQMessageListener(consumerGroup = "group01", topic = "test")
public class RocketMQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("消费消息:" + s);
    }
}

4、创建主题测试RocketMQ消息发送与接收

创建test主题:
bin\mqadmin.cmd updateTopic -b localhost:10911 -n localhost:9876 -t test

启动Springboot工程,访问生产者相关接口测试RocketMQ消息:

(1)发送普通消息接口:
http://localhost:8081/rocketmq/send?message=001

控制台日志打印:

消费消息:001

(2)发送同步消息接口:
http://localhost:8081/rocketmq/syncSend?message=002

控制台日志打印:

消费消息:002

(3)发送异步消息接口(带回调方法):
http://localhost:8081/rocketmq/asyncSend?message=003
控制台日志打印:

消费消息:003

(完)

添加新评论