Springboot项目使用Redis实现发布订阅功能
Redis发布订阅
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息;
消息发布者和订阅者不进行直接通信,发布者客户端向指定的频道(channel)发布消息,订阅该频道的每个客户端都可以收到该消息。
如下图示:
Redis发布订阅相关命令
将消息发送至指定频道:
PUBLISH channel message
订阅一个或多个频道的消息:
SUBSCRIBE channel [channel ...]
退订一个或多个指定频道:
UNSUBSCRIBE channel [channel ...]
订阅一个或多个符合给定模式的频道:
PSUBSCRIBE pattern [pattern ...]
退订所有符合给定模式的频道:
PUNSUBSCRIBE pattern [pattern ...]
Redis发布订阅客户端演示
打开多个Redis客户端,演示Redis发布订阅功能:
client1: 消息订阅者客户端,订阅指定频道channel01的消息;
127.0.0.1:6379> SUBSCRIBE channel01
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel01"
3) (integer) 1
1) "message"
2) "channel01"
3) "hello world"
client2:消息发布者客户端,发布消息至channel01频道;
127.0.0.1:6379> PUBLISH channel01 'hello world'
(integer) 1
127.0.0.1:6379>
如上所示,在client1订阅者的窗口内,我们可以正常接收到client2消息发布者发布的hello world消息。接下来,我们看下在Springboot项目如何使用Redis实现发布订阅功能。
Springboot项目实现Redis消息发布订阅
1、pom.xml添加redis依赖
如下所示:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- redis 缓存操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- pool 对象池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
2、定义Redis订阅者消息监听器配置类
@Configuration
public class RedisMessageListenerConfig {
@Autowired
public MessageListener redisMessageListener;
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//频道集合
List<Topic> topicList = new ArrayList<>();
topicList.add(new PatternTopic("channel-*"));
container.addMessageListener(redisMessageListener, topicList);
return container;
}
}
说明:
Topic接口对应有两个实现类:
ChannelTopic 对应channel类型的命令,监听指定的频道;
PatternTopic 对应pattern类型的命令,监听所有符合给定模式的频道;
3、定义Redis消息监听器处理类
@Component
@Slf4j
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
log.info("收到订阅消息:{}", message.toString());
}
}
4、使用Redis客户端发送消息或新增接口作为消息发送者
@RestController
public class RedisController {
private String CHANNEL = "channel-01";
private RedisTemplate<String, String> redisTemplate;
public RedisController(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@GetMapping("/publish")
public void publish(@RequestParam String message) {
// 发送消息
redisTemplate.convertAndSend(CHANNEL, message);
}
}
打印日志:
19:14:10.850 [redisMessageListenerContainer-2] INFO c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc1
19:14:23.921 [redisMessageListenerContainer-3] INFO c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc2
19:14:44.121 [redisMessageListenerContainer-4] INFO c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc3
19:14:47.082 [redisMessageListenerContainer-5] INFO c.s.c.c.r.RedisMessageListener - [onMessage,14] - 收到订阅消息:abc4
扩展:
也可以通过redisTemplate获取RedisConnection对象来订阅指定频道的消息,如下所示:
@Slf4j
@Service
public class MessageSubscriber {
private String CHANNEL = "channel-01";
public MessageSubscriber(RedisTemplate redisTemplate) {
RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection();
redisConnection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] bytes) {
// 收到消息的处理逻辑
log.info("Receive message : " + message);
}
}, CHANNEL.getBytes(StandardCharsets.UTF_8));
}
}
4、Redis发布订阅功能使用场景
一、Redis发布订阅功能缺点:
(1)发布的消息无法持久化,存在丢失的风险;
和常规的MQ不同,Redis实现的发布订阅模式消息无法持久化,一经发布,即使没有任何订阅方处理,该条消息就会丢失;
(2)没有类似ACK的机制
发布方不会确保订阅方成功接收;
(3)集群环境下,会存在重复消费的问题
同一个服务部署多个节点时,每个节点都被认为是一个订阅者,所以都会接收到订阅的消息,可能会导致消息的重复消费;
二、Redis发布订阅与ActiveMQ的比较
(1)ActiveMQ支持多种消息协议,包括AMQP,MQTT,Stomp等,并且支持JMS规范,但Redis没有提供对这些协议的支持;
(2)ActiveMQ提供持久化功能,但Redis无法对消息持久化存储,一旦消息被发送,如果没有订阅者接收,那么消息就会丢失;
(3)ActiveMQ提供了消息传输保障,当客户端连接超时或事务回滚等情况发生时,消息会被重新发送给客户端,Redis没有提供消息传输保障。
总之,ActiveMQ所提供的功能远比Redis发布订阅要复杂,毕竟Redis不是专门做发布订阅的。
总结:Redis发布订阅适合对消息处理可靠性要求不高的场景使用。
(完)