Springboot项目使用Redis实现发布订阅功能

Redis发布订阅

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息;
消息发布者和订阅者不进行直接通信,发布者客户端向指定的频道(channel)发布消息,订阅该频道的每个客户端都可以收到该消息。
如下图示:

Redis发布订阅模式Redis发布订阅模式

Redis发布订阅相关命令

将消息发送至指定频道:
PUBLISH channel message

订阅一个或多个频道的消息:
SUBSCRIBE channel [channel ...]

退订一个或多个指定频道:
UNSUBSCRIBE channel [channel ...]

订阅一个或多个符合给定模式的频道:
PSUBSCRIBE pattern [pattern ...]

退订所有符合给定模式的频道:
PUNSUBSCRIBE pattern [pattern ...]

Redis发布订阅客户端演示

打开多个Redis客户端,演示Redis发布订阅功能:

client1: 消息订阅者客户端,订阅指定频道channel01的消息;

127.0.0.1:6379> SUBSCRIBE channel01
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel01"
3) (integer) 1
1) "message"
2) "channel01"
3) "hello world"

client2:消息发布者客户端,发布消息至channel01频道;

127.0.0.1:6379> PUBLISH channel01 'hello world'
(integer) 1
127.0.0.1:6379>

如上所示,在client1订阅者的窗口内,我们可以正常接收到client2消息发布者发布的hello world消息。接下来,我们看下在Springboot项目如何使用Redis实现发布订阅功能。

Springboot项目实现Redis消息发布订阅

1、pom.xml添加redis依赖

如下所示:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- redis 缓存操作 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- pool 对象池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2、定义Redis订阅者消息监听器配置类

@Configuration
public class RedisMessageListenerConfig {

    @Autowired
    public MessageListener redisMessageListener;

    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //频道集合
        List<Topic> topicList = new ArrayList<>();
        topicList.add(new PatternTopic("channel-*"));
        container.addMessageListener(redisMessageListener, topicList);
        return container;
    }
}

说明:
Topic接口对应有两个实现类:
ChannelTopic 对应channel类型的命令,监听指定的频道;
PatternTopic 对应pattern类型的命令,监听所有符合给定模式的频道;

3、定义Redis消息监听器处理类

@Component
@Slf4j
public class RedisMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        log.info("收到订阅消息:{}", message.toString());
    }
}

4、使用Redis客户端发送消息或新增接口作为消息发送者

@RestController
public class RedisController {

    private String CHANNEL = "channel-01";

    private RedisTemplate<String, String> redisTemplate;

    public RedisController(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @GetMapping("/publish")
    public void publish(@RequestParam String message) {
        // 发送消息
        redisTemplate.convertAndSend(CHANNEL, message);
    }

}

打印日志:

19:14:10.850 [redisMessageListenerContainer-2] INFO  c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc1
19:14:23.921 [redisMessageListenerContainer-3] INFO  c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc2
19:14:44.121 [redisMessageListenerContainer-4] INFO  c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc3
19:14:47.082 [redisMessageListenerContainer-5] INFO  c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc4

扩展:
也可以通过redisTemplate获取RedisConnection对象来订阅指定频道的消息,如下所示:

@Slf4j
@Service
public class MessageSubscriber {

    private String CHANNEL = "channel-01";

    public MessageSubscriber(RedisTemplate redisTemplate) {
        RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection();
        redisConnection.subscribe(new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] bytes) {
                // 收到消息的处理逻辑
                log.info("Receive message : " + message);
            }
        }, CHANNEL.getBytes(StandardCharsets.UTF_8));
    }
}

4、Redis发布订阅功能使用场景

一、Redis发布订阅功能缺点:
(1)发布的消息无法持久化,存在丢失的风险;
和常规的MQ不同,Redis实现的发布订阅模式消息无法持久化,一经发布,即使没有任何订阅方处理,该条消息就会丢失;
(2)没有类似ACK的机制
发布方不会确保订阅方成功接收;
(3)集群环境下,会存在重复消费的问题
同一个服务部署多个节点时,每个节点都被认为是一个订阅者,所以都会接收到订阅的消息,可能会导致消息的重复消费;

二、Redis发布订阅与ActiveMQ的比较
(1)ActiveMQ支持多种消息协议,包括AMQP,MQTT,Stomp等,并且支持JMS规范,但Redis没有提供对这些协议的支持;
(2)ActiveMQ提供持久化功能,但Redis无法对消息持久化存储,一旦消息被发送,如果没有订阅者接收,那么消息就会丢失;
(3)ActiveMQ提供了消息传输保障,当客户端连接超时或事务回滚等情况发生时,消息会被重新发送给客户端,Redis没有提供消息传输保障。
总之,ActiveMQ所提供的功能远比Redis发布订阅要复杂,毕竟Redis不是专门做发布订阅的。

总结:Redis发布订阅适合对消息处理可靠性要求不高的场景使用

(完)

添加新评论