RocketMQ不同类型消息发送

消息属性

RocketMQ 消息构成非常简单,如下图所示。

topic,表示要发送的消息的主题。
body 表示消息的存储内容
properties 表示消息属性
transactionId 会在事务消息中使用。

Message 可以设置的属性值包括:

字段名默认值必要性说明
Topicnull必填消息所属 topic 的名称
Bodynull必填消息体
Tagsnull选填消息标签,方便服务器过滤使用。目前只支持每个消息设置一个
Keysnull选填代表这条消息的业务关键词
Flag0选填 完全由应用来设置,RocketMQ不做干预
DelayTimeLevel0选填消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费
WaitStoreMsgOKtrue选填表示消息是否在服务器落盘后才返回应答。

Tag

Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
Tag:消息标签,用来进一步区分某个Topic下的消息分类,消息从生产者发出即带上的属性。

Keys

Apache RocketMQ每个消息可以在业务层面设置唯一标识码keys字段,方便将来定位消息。Broker端会为每个消息创建索引(哈希索引),应用可以通过Topic和key
来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突、
例如:
// 订单ID
String orderId = "20034568923546";
message.setKeys(orderId);

队列

为了支持高并发和水平扩展,需要对Topic进行分区,在RocketMQ中被称为队列,一个Topic可能有多个队列,并且可能分布在不同的Broker上。

一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在Topic的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点。

Apache RocketMQ拥有丰富的消息类型,可以支持不同的应用场景,下面我们主要介绍各种类型消息的发送。

普通消息发送

创建Springboot项目,在pom.xml添加RocketMQ依赖:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.9.4</version>
</dependency>

Aoache RocketMQ发送方式有三种:同步、异步和单向传输。前两种消息类型是可靠的,因为无论他们是否发送成功都会有响应。

(1)同步发送

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后,才会继续发送下一条消息。可靠的同步传输被广泛应用于各种场景,如重要的消息通知、短消息通知等。

如下图示:

producer-sendmsg-sync.pngproducer-sendmsg-sync.png

同步发送生产者示例:

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // (1)初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group_producer_01");
        // (2)设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            // (3)创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message("test",
                    "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // (4)利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        // 一旦producer不再使用,关闭producer
        producer.shutdown();
    }
}

新增两个消费者类,代码如下:

RocketMQConsumer类:

public class RocketMQConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("test", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    for (Message message : list) {
                        String messageBody = new String(message.getBody(), "UTF-8");
                        System.out.println(String.format("topic=%s, body=%s", message.getTopic(), messageBody));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started %n");
    }
}

RocketMQConsumer2类:

public class RocketMQConsumer2 {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("test", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    for (Message message : list) {
                        String messageBody = new String(message.getBody(), "UTF-8");
                        System.out.println(String.format("topic=%s, body=%s", message.getTopic(), messageBody));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started %n");
    }
}

先启动两个消费者,然后执行生产者main方法发送消息,

生产者控制台日志打印:

SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BC30000, offsetMsgId=C0A81F5F00002A9F000000000000A866, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=7], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BC90001, offsetMsgId=C0A81F5F00002A9F000000000000A950, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=0], queueOffset=20]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCA0002, offsetMsgId=C0A81F5F00002A9F000000000000AA3A, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=1], queueOffset=17]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCB0003, offsetMsgId=C0A81F5F00002A9F000000000000AB24, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=2], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCC0004, offsetMsgId=C0A81F5F00002A9F000000000000AC0E, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=3], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCD0005, offsetMsgId=C0A81F5F00002A9F000000000000ACF8, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=4], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCE0006, offsetMsgId=C0A81F5F00002A9F000000000000ADE2, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=5], queueOffset=16]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BCF0007, offsetMsgId=C0A81F5F00002A9F000000000000AECC, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=6], queueOffset=16]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BD00008, offsetMsgId=C0A81F5F00002A9F000000000000AFB6, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=7], queueOffset=19]
SendResult [sendStatus=SEND_OK, msgId=7F000001388418B4AAC27F4F3BD30009, offsetMsgId=C0A81F5F00002A9F000000000000B0A0, messageQueue=MessageQueue [topic=test, brokerName=LAPTOP-NH51ES2C, queueId=0], queueOffset=21]
17:18:19.098 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
17:18:19.102 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
17:18:19.102 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.95:10911] result: true

Process finished with exit code 0

RocketMQConsumer消费者控制台日志:

topic=test, body=Hello RocketMQ 0
topic=test, body=Hello RocketMQ 5
topic=test, body=Hello RocketMQ 6
topic=test, body=Hello RocketMQ 7
topic=test, body=Hello RocketMQ 8

RocketMQConsumer2消费者控制台日志:

topic=test, body=Hello RocketMQ 1
topic=test, body=Hello RocketMQ 4
topic=test, body=Hello RocketMQ 2
topic=test, body=Hello RocketMQ 3
topic=test, body=Hello RocketMQ 9

(2)异步发送

异步发送是指消息发送方发出一条消息后,不会等待服务端返回响应,就接着发送下一条消息的通讯方式。

如下图示:

producer-sendmsg-async.pngproducer-sendmsg-async.png

异步发送需要实现异步发送回调接口(SendCallback),通过回调接口处理服务端响应。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

异步发送示例如下:

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group_producer_01");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动producer
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message("TOPIC_ASYNC_TEST",
                    "TagA",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 异步发送消息, 发送结果通过callback返回给客户端
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        // 一旦producer不再使用,关闭producer
        producer.shutdown();
    }
}

(3)单向模式发送

如下图示:

producer-sendmsg-ignore-result.pngproducer-sendmsg-ignore-result.png

发送方只负责发送消息,不等待服务器返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求不高的场景,例如日志收集等。

单向模式发送示例:

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        // 初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer("producer_group_03");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message("TopicTest",
                    "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
            producer.sendOneway(msg);
        }
        // 一旦producer不再使用,关闭producer
        producer.shutdown();
    }
}

顺序消息发送

对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在Apache RocketMQ中支持分区顺序消息。我们可以按照某个标准对消息进行分区,同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

RocketMQ消息的顺序性分为两部分,生产顺序性和消费顺序性。

生产顺序性:RocketMQ通过生产者和服务端的协议保障单个生产者串行的发送消息,并按序存储和持久化。如需保证消息生产的顺序性必须满足以下条件:

  • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者之间产生的消息无法判定其先后顺序;
  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将服务判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。

服务端顺序存储逻辑如下图示:

public class OrderMsgProducer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {    
            DefaultMQProducer producer = new DefaultMQProducer("producer_group_03");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                        new Message("TOPIC_ORDER_MSG_TEST", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这里的区别主要是调用了SendResult send(Message msg, MessageQueueSelector selector, Object arg)方法,
MessageQueueSelector 是队列选择器,arg是一个Java Object对象,可以传入作为消息发送分区的分类标准。

MessageQueueSelector的接口如下:

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

mqs:是可以发送的队列
msg:是要发送的消息
arg:是上述send接口中传入的Object对象
select方法返回值:是该消息需要发送到的队列

上述示例中,是以orderId作为分区分类的标准,对所有队列个数取余,即orderId % mqs.size(),对于相同的orderId的消息将被发送到同一个队列中。

生产环境中建议选择最细粒度的分区键进行拆分,例如:将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

顺序消息的一致性

如果一个Broker掉线,那么此时队列总数是否会变化?
如果发生变化,那么同一个ShardingKey的消息就会发送到不同的队列上,造成乱序。
如果不发生变化,那消息将会发送到掉线Broker的队列上,必然会发送失败。

因此,Apache RocketMQ提供了两种模式,如果要保证严格顺序而不是可用性,创建Topic时要指定-o参数(--order)为true,表示顺序消息;

例如,创建TopicTest主题:
sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876

其次,要保证NameServer中的配置orderMessageEnable和returnOrderTopicConfigToBroker必须是true。

如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。

延迟消息发送

延迟消息发送是指消息发送到RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消息。

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发、使用RocketMQ的延时消息可以简化定时调度任务的开发逻辑,
实现高性能、可扩展、高可靠的定时触发能力。

延时消息约束

Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level)延迟时间投递等级(delay level)延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

延时消息示例代码如下:

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("test", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }

        // Shutdown producer after use.
        producer.shutdown();
    }

}

批量消息发送

在对吞吐率有一定要求的情况下,RocketMQ可以将一些消息聚成一批后再进行发送,可以增加吞吐率,并减少API和网络调用次数。

批量消息发送示例如下:

public class SimpleBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "test";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);
    }
}

事务消息发送

在一些对数据一致性有强需求的场景,可以使用RocketMQ事务消息来解决,从而保证上下游数据的一致性。

详细参见RocketMQ分布式事务解决方案:
https://whwtree.com/archives/rocketmq-transaction.html

参考官网地址:
https://rocketmq.apache.org/zh/docs/4.x/producer/04concept1/
(完)

最后修改于:2022年11月26日 01:59

添加新评论