RocketMQ不同类型消息发送
消息属性
RocketMQ 消息构成非常简单,如下图所示。
topic,表示要发送的消息的主题。
body 表示消息的存储内容
properties 表示消息属性
transactionId 会在事务消息中使用。
Message 可以设置的属性值包括:
字段名 | 默认值 | 必要性 | 说明 |
---|---|---|---|
Topic | null | 必填 | 消息所属 topic 的名称 |
Body | null | 必填 | 消息体 |
Tags | null | 选填 | 消息标签,方便服务器过滤使用。目前只支持每个消息设置一个 |
Keys | null | 选填 | 代表这条消息的业务关键词 |
Flag | 0 | 选填 完全由应用来设置,RocketMQ不做干预 | |
DelayTimeLevel | 0 | 选填 | 消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 |
WaitStoreMsgOK | true | 选填 | 表示消息是否在服务器落盘后才返回应答。 |
Tag
Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
Tag:消息标签,用来进一步区分某个Topic下的消息分类,消息从生产者发出即带上的属性。
Keys
Apache RocketMQ每个消息可以在业务层面设置唯一标识码keys字段,方便将来定位消息。Broker端会为每个消息创建索引(哈希索引),应用可以通过Topic和key
来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突、
例如:
// 订单ID
String orderId = "20034568923546";
message.setKeys(orderId);
队列
为了支持高并发和水平扩展,需要对Topic进行分区,在RocketMQ中被称为队列,一个Topic可能有多个队列,并且可能分布在不同的Broker上。
一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在Topic的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点。
Apache RocketMQ拥有丰富的消息类型,可以支持不同的应用场景,下面我们主要介绍各种类型消息的发送。
普通消息发送
创建Springboot项目,在pom.xml添加RocketMQ依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
Aoache RocketMQ发送方式有三种:同步、异步和单向传输。前两种消息类型是可靠的,因为无论他们是否发送成功都会有响应。
(1)同步发送
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后,才会继续发送下一条消息。可靠的同步传输被广泛应用于各种场景,如重要的消息通知、短消息通知等。
如下图示:
同步发送生产者示例:
public class SyncProducer {
public static void main(String[] args) throws Exception {
// (1)初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("group_producer_01");
// (2)设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动producer
producer.start();
for (int i = 0; i < 10; i++) {
// (3)创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("test",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// (4)利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
新增两个消费者类,代码如下:
RocketMQConsumer类:
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("test", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (Message message : list) {
String messageBody = new String(message.getBody(), "UTF-8");
System.out.println(String.format("topic=%s, body=%s", message.getTopic(), messageBody));
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started %n");
}
}
RocketMQConsumer2类:
public class RocketMQConsumer2 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("test", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (Message message : list) {
String messageBody = new String(message.getBody(), "UTF-8");
System.out.println(String.format("topic=%s, body=%s", message.getTopic(), messageBody));
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started %n");
}
}
先启动两个消费者,然后执行生产者main方法发送消息,
生产者控制台日志打印:
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BC30000, offsetMsgId=C0A81F5F00002A9F000000000000A866, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=7], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BC90001, offsetMsgId=C0A81F5F00002A9F000000000000A950, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=0], queueOffset=20]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCA0002, offsetMsgId=C0A81F5F00002A9F000000000000AA3A, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=1], queueOffset=17]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCB0003, offsetMsgId=C0A81F5F00002A9F000000000000AB24, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=2], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCC0004, offsetMsgId=C0A81F5F00002A9F000000000000AC0E, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=3], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCD0005, offsetMsgId=C0A81F5F00002A9F000000000000ACF8, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=4], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCE0006, offsetMsgId=C0A81F5F00002A9F000000000000ADE2, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=5], queueOffset=16]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCF0007, offsetMsgId=C0A81F5F00002A9F000000000000AECC, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=6], queueOffset=16]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BD00008, offsetMsgId=C0A81F5F00002A9F000000000000AFB6, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=7], queueOffset=19]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BD30009, offsetMsgId=C0A81F5F00002A9F000000000000B0A0, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=0], queueOffset=21]
17:18:19.098 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
17:18:19.102 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
17:18:19.102 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.95:10911] result: true
Process finished with exit code 0
RocketMQConsumer消费者控制台日志:
topic=test, body=Hello RocketMQ 0
topic=test, body=Hello RocketMQ 5
topic=test, body=Hello RocketMQ 6
topic=test, body=Hello RocketMQ 7
topic=test, body=Hello RocketMQ 8
RocketMQConsumer2消费者控制台日志:
topic=test, body=Hello RocketMQ 1
topic=test, body=Hello RocketMQ 4
topic=test, body=Hello RocketMQ 2
topic=test, body=Hello RocketMQ 3
topic=test, body=Hello RocketMQ 9
(2)异步发送
异步发送是指消息发送方发出一条消息后,不会等待服务端返回响应,就接着发送下一条消息的通讯方式。
如下图示:
异步发送需要实现异步发送回调接口(SendCallback),通过回调接口处理服务端响应。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
异步发送示例如下:
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("group_producer_01");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("TOPIC_ASYNC_TEST",
"TagA",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
(3)单向模式发送
如下图示:
发送方只负责发送消息,不等待服务器返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求不高的场景,例如日志收集等。
单向模式发送示例:
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("producer_group_03");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动producer
producer.start();
for (int i = 0; i < 10; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
producer.sendOneway(msg);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
顺序消息发送
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在Apache RocketMQ中支持分区顺序消息。我们可以按照某个标准对消息进行分区,同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。
RocketMQ消息的顺序性分为两部分,生产顺序性和消费顺序性。
生产顺序性:RocketMQ通过生产者和服务端的协议保障单个生产者串行的发送消息,并按序存储和持久化。如需保证消息生产的顺序性必须满足以下条件:
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者之间产生的消息无法判定其先后顺序;
- 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将服务判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。
服务端顺序存储逻辑如下图示:
public class OrderMsgProducer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("producer_group_03");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TOPIC_ORDER_MSG_TEST", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
这里的区别主要是调用了SendResult send(Message msg, MessageQueueSelector selector, Object arg)方法,
MessageQueueSelector 是队列选择器,arg是一个Java Object对象,可以传入作为消息发送分区的分类标准。
MessageQueueSelector的接口如下:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
mqs:是可以发送的队列
msg:是要发送的消息
arg:是上述send接口中传入的Object对象
select方法返回值:是该消息需要发送到的队列
上述示例中,是以orderId作为分区分类的标准,对所有队列个数取余,即orderId % mqs.size(),对于相同的orderId的消息将被发送到同一个队列中。
生产环境中建议选择最细粒度的分区键进行拆分,例如:将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。
顺序消息的一致性
如果一个Broker掉线,那么此时队列总数是否会变化?
如果发生变化,那么同一个ShardingKey的消息就会发送到不同的队列上,造成乱序。
如果不发生变化,那消息将会发送到掉线Broker的队列上,必然会发送失败。
因此,Apache RocketMQ提供了两种模式,如果要保证严格顺序而不是可用性,创建Topic时要指定-o参数(--order)为true,表示顺序消息;
例如,创建TopicTest主题:
sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
其次,要保证NameServer中的配置orderMessageEnable和returnOrderTopicConfigToBroker必须是true。
如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。
延迟消息发送
延迟消息发送是指消息发送到RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消息。
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发、使用RocketMQ的延时消息可以简化定时调度任务的开发逻辑,
实现高性能、可扩展、高可靠的定时触发能力。
延时消息约束
Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
延时消息示例代码如下:
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// Launch producer
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("test", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
批量消息发送
在对吞吐率有一定要求的情况下,RocketMQ可以将一些消息聚成一批后再进行发送,可以增加吞吐率,并减少API和网络调用次数。
批量消息发送示例如下:
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "test";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
}
事务消息发送
在一些对数据一致性有强需求的场景,可以使用RocketMQ事务消息来解决,从而保证上下游数据的一致性。
详细参见RocketMQ分布式事务解决方案:
https://whwtree.com/archives/rocketmq-transaction.html
参考官网地址:
https://rocketmq.apache.org/zh/docs/4.x/producer/04concept1/
(完)