基于SpringBoot+Redis实现一个完整的发布订阅系统
作者:J_liaty
前言
在分布式系统中,消息队列和发布订阅模式是实现服务间解耦、异步通信的重要手段。Redis不仅是一个高性能的键值存储系统,还提供了发布订阅(Publish/Subscribe)功能,可以轻松实现消息的发布与订阅。
本文将详细介绍如何在Spring Boot项目中集成Redis,实现一个简单但功能完整的发布订阅系统。
一、环境准备
1.1 Redis发布订阅简介
Redis的发布订阅是一种消息通信模式:
- 发布者(Publisher):向指定频道发送消息
- 订阅者(Subscriber):订阅感兴趣的频道并接收消息
- 频道(Channel):消息的逻辑分类,用于连接发布者和订阅者
特点:
- 解耦:发布者和订阅者无需知道彼此的存在
- 一对多:一个消息可以同时发送给多个订阅者
- 实时性:消息实时传递
- 无持久化:消息不持久化,订阅者离线会丢失消息
二、项目搭建
2.1 创建Spring Boot项目
使用Spring Initializr创建项目,或者手动创建Maven项目并添加以下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>redis-pubsub-demo</artifactId>
<version>1.0.0</version>
<name>Redis Pub/Sub Demo</name>
<description>Spring Boot + Redis 发布订阅示例</description>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Lombok(可选,简化代码) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Fastjson(JSON序列化) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2 配置Redis连接
在src/main/resources/application.yml中配置Redis连接信息:
server:
port: 8080
spring:
application:
name: redis-pubsub-demo
# Redis配置
redis:
host: localhost
port: 6379
password: # 如果有密码则填写
database: 0
timeout: 3000
# Lettuce连接池配置
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5
max-wait: 3000ms
# 日志配置
logging:
level:
com.example: DEBUG
org.springframework.data.redis: DEBUG
三、核心实现
3.1 Redis配置类
创建Redis配置类,配置序列化器和消息监听容器:
package com.example.redispubsub.config;
import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer;
import io.lettuce.core.LettuceConnection;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
/**
* Redis配置类 - 配置Redis连接和序列化
*
* @author example
* @date 2024-02-27
*/
@Configuration
public class RedisConfig {
/**
* 配置RedisTemplate
* 使用Fastjson进行对象序列化,key使用String序列化
*
* @param factory Redis连接工厂,Spring自动注入
* @return 配置好的RedisTemplate实例
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
// 创建RedisTemplate实例
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置Redis连接工厂
template.setConnectionFactory(factory);
// ========================================
// 配置Key的序列化器(使用String序列化)
// ========================================
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer); // 设置key的序列化器
template.setHashKeySerializer(stringRedisSerializer); // 设置Hash中key的序列化器
// ========================================
// 配置Value的序列化器(使用Fastjson序列化)
// ========================================
GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
template.setValueSerializer(fastJsonRedisSerializer); // 设置value的序列化器
template.setHashValueSerializer(fastJsonRedisSerializer); // 设置Hash中value的序列化器
// 执行初始化操作(必须调用)
template.afterPropertiesSet();
return template;
}
/**
* 配置Redis连接池(Lettuce)
* 优化连接池参数以提高性能
*
* @return Lettuce客户端配置
*/
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
// 配置连接池
GenericObjectPoolConfig<LettuceConnection> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(20); // 最大连接数
poolConfig.setMaxIdle(10); // 最大空闲连接数
poolConfig.setMinIdle(5); // 最小空闲连接数
poolConfig.setMaxWaitMillis(3000); // 最大等待时间
// 创建Lettuce客户端配置
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(3)) // 命令超时时间
.shutdownTimeout(Duration.ZERO) // 关闭超时时间
.poolConfig(poolConfig) // 连接池配置
.build();
// 创建连接工厂并返回
return new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", 6379), clientConfig);
}
}
3.2 消息实体类
定义消息实体,用于传输消息内容:
package com.example.redispubsub.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 消息实体类
*
* @author example
* @date 2024-02-27
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息ID
*/
private String messageId;
/**
* 消息类型
*/
private String messageType;
/**
* 消息内容
*/
private String content;
/**
* 发送时间
*/
private LocalDateTime sendTime;
/**
* 发送者
*/
private String sender;
/**
* 创建消息
*/
public static Message create(String messageType, String content, String sender) {
Message message = new Message();
message.setMessageId(System.currentTimeMillis() + "");
message.setMessageType(messageType);
message.setContent(content);
message.setSendTime(LocalDateTime.now());
message.setSender(sender);
return message;
}
}
3.3 消息发布者
创建消息发布者服务,用于向指定频道发布消息:
package com.example.redispubsub.publisher;
import com.alibaba.fastjson.JSON;
import com.example.redispubsub.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
/**
* 消息发布者服务 - 负责向Redis频道发布消息
*
* @author example
* @date 2024-02-27
*/
@Slf4j
@Service
public class MessagePublisher {
// ========================================
// 注入RedisTemplate,用于操作Redis
// 注意:RedisTemplate是线程安全的,可以在多线程环境中共享使用
// ========================================
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 发布消息到指定频道
*
* 工作原理:
* 1. 使用RedisTemplate的convertAndSend方法发布消息
* 2. 该方法会自动将消息对象序列化为字节数组
* 3. 然后将消息发送到Redis服务器的指定频道
* 4. 所有订阅该频道的订阅者都会收到该消息
*
* @param channel 频道名称(字符串)
* @param message 要发布的消息对象(会被序列化为JSON)
*/
public void publish(String channel, Message message) {
try {
// 记录日志:准备发布消息
log.info("准备发布消息到频道 [{}]: {}", channel, JSON.toJSONString(message));
// ========================================
// 核心发布逻辑
// ========================================
// convertAndSend是RedisTemplate提供的方法,用于发布消息
// 它会自动进行对象的序列化和反序列化
redisTemplate.convertAndSend(channel, message);
// 记录日志:消息发布成功
log.info("消息发布成功到频道 [{}]", channel);
} catch (Exception e) {
// 记录错误日志
log.error("消息发布失败: {}", e.getMessage(), e);
// 实际项目中,这里可以添加重试逻辑或告警通知
}
}
/**
* 发布文本消息(便捷方法)
* 自动创建Message对象并发布
*
* @param channel 频道名称
* @param content 消息内容
*/
public void publishText(String channel, String content) {
// 创建消息对象:类型为TEXT,发送者为SYSTEM
Message message = Message.create("TEXT", content, "SYSTEM");
// 调用核心发布方法
publish(channel, message);
}
}
3.4 消息订阅者
创建消息订阅者,实现消息监听接口:
package com.example.redispubsub.subscriber;
import com.alibaba.fastjson.JSON;
import com.example.redispubsub.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* 聊天消息订阅者 - 实现MessageListener接口,监听chat频道的消息
*
* 工作原理:
* 1. 实现Spring Data Redis的MessageListener接口
* 2. 当Redis服务器有消息发布到订阅的频道时,会自动调用onMessage方法
* 3. onMessage方法在独立的线程中执行,不会阻塞主线程
*
* @author example
* @date 2024-02-27
*/
@Slf4j
@Component
public class ChatMessageSubscriber implements MessageListener {
/**
* 消息接收回调方法(由Redis消息监听容器自动调用)
*
* @param message Redis消息对象,包含频道名和消息体
* @param pattern 频道匹配模式(如果是模式订阅,则为匹配的模式;否则为null)
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
// ========================================
// 步骤1:解析频道名称
// ========================================
String channel = new String(message.getChannel());
log.debug("收到消息,来自频道: {}", channel);
// ========================================
// 步骤2:解析消息体
// ========================================
byte[] body = message.getBody();
// 使用Fastjson反序列化为Message对象
// 注意:需要确保发布的对象和接收的对象类型一致
Message msg = JSON.parseObject(body, Message.class);
// ========================================
// 步骤3:记录消息详情
// ========================================
log.info("收到消息 - 频道: {}, 消息ID: {}, 类型: {}, 内容: {}, 发送者: {}, 时间: {}",
channel,
msg.getMessageId(),
msg.getMessageType(),
msg.getContent(),
msg.getSender(),
msg.getSendTime());
// ========================================
// 步骤4:处理业务逻辑
// ========================================
handleMessage(msg);
} catch (Exception e) {
// 异常处理:记录错误日志,避免因单个消息处理失败影响后续消息接收
log.error("消息处理失败: {}", e.getMessage(), e);
// 实际项目中,可以考虑将失败的消息记录到死信队列或数据库,以便后续重试
}
}
/**
* 处理消息业务逻辑
* 根据不同的消息类型执行不同的业务处理
*
* @param message 消息对象
*/
private void handleMessage(Message message) {
// 根据消息类型进行分支处理
switch (message.getMessageType()) {
case "TEXT":
// 处理文本消息:记录日志、保存到数据库等
log.info("处理文本消息: {}", message.getContent());
// TODO: 添加具体的业务逻辑,例如保存聊天记录
break;
case "ORDER":
// 处理订单消息:更新订单状态、发送通知等
log.info("处理订单消息: {}", message.getContent());
// TODO: 添加订单相关的业务逻辑
break;
case "NOTIFICATION":
// 处理通知消息:推送到前端、发送邮件等
log.info("处理通知消息: {}", message.getContent());
// TODO: 添加通知相关的业务逻辑
break;
default:
// 未知消息类型:记录警告日志
log.warn("未知消息类型: {}, 内容: {}", message.getMessageType(), message.getContent());
}
}
}
3.5 消息监听器配置
创建监听器配置类,将订阅者注册到监听容器中:
package com.example.redispubsub.config;
import com.example.redispubsub.subscriber.ChatMessageSubscriber;
import com.example.redispubsub.subscriber.OrderMessageSubscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* 消息监听器配置类 - 负责注册订阅者和频道的绑定关系
*
* @author example
* @date 2024-02-27
*/
@Configuration
public class MessageListenerConfig {
// ========================================
// 注入订阅者实例(由Spring容器管理)
// ========================================
@Autowired
private ChatMessageSubscriber chatMessageSubscriber; // 聊天消息订阅者
@Autowired(required = false)
private OrderMessageSubscriber orderMessageSubscriber; // 订单消息订阅者(可选)
@Autowired(required = false)
private NewsMessageSubscriber newsMessageSubscriber; // 新闻消息订阅者(可选)
/**
* 配置Redis消息监听容器
*
* 功能说明:
* 1. RedisMessageListenerContainer是Spring Data Redis提供的消息监听容器
* 2. 它会自动管理与Redis的连接,并启动独立的线程来接收消息
* 3. 在容器启动时,会自动注册所有订阅者到对应的频道
* 4. 当Redis服务器有消息发布到已订阅的频道时,容器会立即接收并调用对应的订阅者
*
* @param connectionFactory Redis连接工厂,由Spring自动注入
* @return 配置好的消息监听容器
*/
@Bean
public RedisMessageListenerContainer messageListenerContainer(
RedisConnectionFactory connectionFactory) {
// ========================================
// 创建消息监听容器
// ========================================
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// ========================================
// 注册订阅者到对应的频道
// ========================================
// 注册聊天消息订阅者 - 监听"chat"频道
// PatternTopic: 支持通配符模式的频道订阅,例如 "chat.*" 会匹配所有以chat.开头的频道
container.addMessageListener(chatMessageSubscriber, new PatternTopic("chat"));
System.out.println("[Redis Pub/Sub] 已注册订阅者: ChatMessageSubscriber -> 频道: chat");
// 注册订单消息订阅者 - 监听"order"频道(如果存在)
if (orderMessageSubscriber != null) {
container.addMessageListener(orderMessageSubscriber, new PatternTopic("order"));
System.out.println("[Redis Pub/Sub] 已注册订阅者: OrderMessageSubscriber -> 频道: order");
}
// 注册新闻消息订阅者 - 使用模式订阅,监听所有"news.*"开头的频道(如果存在)
if (newsMessageSubscriber != null) {
// 模式订阅示例:news.sports、news.tech、news.weather 都会被监听到
container.addMessageListener(newsMessageSubscriber, new PatternTopic("news.*"));
System.out.println("[Redis Pub/Sub] 已注册订阅者: NewsMessageSubscriber -> 模式: news.*");
}
// ========================================
// 可选:设置线程池(提升性能)
// ========================================
// 如果需要自定义线程池,可以取消下面注释
/*
container.setTaskExecutor(Executors.newFixedThreadPool(10));
*/
return container;
}
}
四、创建测试接口
4.1 发布消息接口
创建Controller,提供HTTP接口用于发布消息:
package com.example.redispubsub.controller;
import com.example.redispubsub.model.Message;
import com.example.redispubsub.publisher.MessagePublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
/**
* 消息发布控制器
*
* @author example
* @date 2024-02-27
*/
@Slf4j
@RestController
@RequestMapping("/api/message")
public class MessageController {
@Autowired
private MessagePublisher messagePublisher;
/**
* 发布文本消息到chat频道
*
* @param content 消息内容
* @return 发布结果
*/
@PostMapping("/publish")
public Map<String, Object> publishMessage(@RequestParam String content) {
Map<String, Object> result = new HashMap<>();
try {
messagePublisher.publishText("chat", content);
result.put("code", 200);
result.put("message", "消息发布成功");
result.put("data", content);
} catch (Exception e) {
log.error("消息发布失败", e);
result.put("code", 500);
result.put("message", "消息发布失败: " + e.getMessage());
}
return result;
}
/**
* 发布自定义消息
*
* @param message 消息对象
* @return 发布结果
*/
@PostMapping("/publish/custom")
public Map<String, Object> publishCustomMessage(@RequestBody Message message) {
Map<String, Object> result = new HashMap<>();
try {
messagePublisher.publish("chat", message);
result.put("code", 200);
result.put("message", "消息发布成功");
result.put("data", message);
} catch (Exception e) {
log.error("消息发布失败", e);
result.put("code", 500);
result.put("message", "消息发布失败: " + e.getMessage());
}
return result;
}
/**
* 向指定频道发布消息
*
* @param channel 频道名称
* @param content 消息内容
* @return 发布结果
*/
@PostMapping("/publish/{channel}")
public Map<String, Object> publishToChannel(
@PathVariable String channel,
@RequestParam String content) {
Map<String, Object> result = new HashMap<>();
try {
messagePublisher.publishText(channel, content);
result.put("code", 200);
result.put("message", "消息发布到频道 [" + channel + "] 成功");
result.put("data", content);
} catch (Exception e) {
log.error("消息发布失败", e);
result.put("code", 500);
result.put("message", "消息发布失败: " + e.getMessage());
}
return result;
}
}
五、启动类
创建Spring Boot启动类:
package com.example.redispubsub;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Redis Pub/Sub Demo 启动类
*
* @author example
* @date 2024-02-27
*/
@SpringBootApplication
public class RedisPubSubApplication {
public static void main(String[] args) {
SpringApplication.run(RedisPubSubApplication.class, args);
System.out.println("========================================");
System.out.println("Redis Pub/Sub Demo 启动成功!");
System.out.println("访问地址: http://localhost:8080");
System.out.println("========================================");
}
}
六、测试验证
6.1 启动Redis服务
确保Redis服务已启动,可以使用以下命令检查:
# 检查Redis服务状态 redis-cli ping # 如果返回 PONG,说明Redis服务正常
6.2 启动应用
运行RedisPubSubApplication的main方法,启动Spring Boot应用。
6.3 测试发布消息
方式1:使用curl命令
# 发布简单文本消息 curl -X POST "http://localhost:8080/api/message/publish?content=Hello%20Redis%20Pub/Sub" # 发布到指定频道 curl -X POST "http://localhost:8080/api/message/publish/order?content=订单创建成功"
方式2:使用Postman
请求1:发布简单文本消息
- URL:
POST http://localhost:8080/api/message/publish - 参数:
content=测试消息
请求2:发布自定义消息
- URL:
POST http://localhost:8080/api/message/publish/custom - Body (JSON):
{
"messageId": "1001",
"messageType": "ORDER",
"content": "订单号: ORDER-20240227-001",
"sender": "ORDER_SERVICE",
"sendTime": "2024-02-27T10:30:00"
}
6.4 查看日志输出
应用启动后,当有消息发布到订阅的频道时,控制台会输出类似以下日志:
收到消息 - 频道: chat, 消息ID: 1709025600000, 类型: TEXT, 内容: 测试消息, 发送者: SYSTEM, 时间: 2024-02-27T10:30:00 处理文本消息: 测试消息
七、进阶功能
7.1 多订阅者示例
创建多个订阅者,订阅不同的频道:
package com.example.redispubsub.subscriber;
import com.alibaba.fastjson.JSON;
import com.example.redispubsub.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* 订单消息订阅者 - 监听order频道的订单相关消息
*
* 实际应用场景:
* - 当订单创建、支付、发货时,发布消息到order频道
* - 本订阅者接收到订单消息后,进行相应的业务处理
*
* @author example
* @date 2024-02-27
*/
@Slf4j
@Component
public class OrderMessageSubscriber implements MessageListener {
/**
* 订单消息接收回调方法
*
* @param message Redis消息对象
* @param pattern 频道匹配模式
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
// 获取频道名称
String channel = new String(message.getChannel());
// 反序列化消息对象
Message msg = JSON.parseObject(message.getBody(), Message.class);
log.info("[订单订阅者] 收到订单消息 - 频道: {}, 消息ID: {}, 内容: {}",
channel, msg.getMessageId(), msg.getContent());
// ========================================
// 订单业务处理逻辑
// ========================================
handleOrderMessage(msg);
} catch (Exception e) {
log.error("[订单订阅者] 消息处理失败: {}", e.getMessage(), e);
}
}
/**
* 处理订单业务逻辑
*
* @param message 订单消息
*/
private void handleOrderMessage(Message message) {
String content = message.getContent();
// 根据订单内容判断订单类型(实际项目中可以定义更复杂的消息结构)
if (content.contains("创建")) {
log.info("[订单订阅者] 处理订单创建: {}", content);
// TODO: 1. 订单数据入库
// TODO: 2. 发送订单创建通知
// TODO: 3. 触发库存扣减逻辑
} else if (content.contains("支付")) {
log.info("[订单订阅者] 处理订单支付: {}", content);
// TODO: 1. 更新订单状态为已支付
// TODO: 2. 触发发货流程
// TODO: 3. 发送支付成功通知
} else if (content.contains("发货")) {
log.info("[订单订阅者] 处理订单发货: {}", content);
// TODO: 1. 更新订单状态为已发货
// TODO: 2. 发送物流信息给客户
}
}
}
在配置类中注册:
@Configuration
public class MessageListenerConfig {
@Autowired
private ChatMessageSubscriber chatMessageSubscriber;
@Autowired
private OrderMessageSubscriber orderMessageSubscriber;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 订阅chat频道
container.addMessageListener(chatMessageSubscriber, new PatternTopic("chat"));
// 订阅order频道
container.addMessageListener(orderMessageSubscriber, new PatternTopic("order"));
return container;
}
}
7.2 模式订阅
使用模式订阅,订阅匹配的多个频道:
/**
* 新闻消息订阅者 - 使用模式订阅
*/
@Slf4j
@Component
public class NewsMessageSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
Message msg = JSON.parseObject(message.getBody(), Message.class);
log.info("[新闻订阅者] 频道: {}, 新闻: {}", channel, msg.getContent());
}
}
配置模式订阅:
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 模式订阅:订阅所有news.*开头的频道
container.addMessageListener(newsMessageSubscriber, new PatternTopic("news.*"));
return container;
}
八、注意事项
8.1 Redis发布订阅的局限性
消息不持久化
- Redis发布订阅是实时消息传递
- 如果订阅者离线,期间发布的消息会永久丢失
- 不适用于需要消息可靠性的场景
无消息确认机制
- 发布者无法知道消息是否被订阅者接收
- 没有重试机制
扩展性限制
- 在高并发场景下可能成为性能瓶颈
- Redis集群模式下,Pub/Sub可能存在限制
8.2 生产环境建议
如果需要消息可靠性和持久化,建议使用以下方案:
Redis Streams(Redis 5.0+)
- 支持消息持久化
- 支持消费者组
- 支持消息确认和重试
专业消息队列
- RabbitMQ
- Apache Kafka
- Apache RocketMQ
8.3 性能优化
合理设置连接池
- 根据并发量调整
max-active、max-idle等参数
消息序列化优化
- 选择高效的序列化方式(如Fastjson、Protobuf)
批量处理
- 对于大量消息,考虑批量发布和接收
九、完整项目结构
redis-pubsub-demo/ ├── src/ │ ├── main/ │ │ ├── java/ │ │ │ └── com/example/redispubsub/ │ │ │ ├── config/ │ │ │ │ ├── MessageListenerConfig.java │ │ │ │ └── RedisConfig.java │ │ │ ├── controller/ │ │ │ │ └── MessageController.java │ │ │ ├── model/ │ │ │ │ └── Message.java │ │ │ ├── publisher/ │ │ │ │ └── MessagePublisher.java │ │ │ ├── subscriber/ │ │ │ │ ├── ChatMessageSubscriber.java │ │ │ │ ├── OrderMessageSubscriber.java │ │ │ │ └── NewsMessageSubscriber.java │ │ │ └── RedisPubSubApplication.java │ │ └── resources/ │ │ └── application.yml │ └── test/ │ └── java/ └── pom.xml
十、总结
本文详细介绍了在Spring Boot中集成Redis实现发布订阅功能的完整流程,包括:
- 项目搭建:Maven依赖配置、Redis连接配置
- 核心实现:Redis配置类、消息实体、发布者、订阅者
- 测试验证:启动应用、发布消息、查看日志
- 进阶功能:多订阅者、模式订阅
- 注意事项:局限性和生产环境建议
适用场景:
- 实时通知系统
- 聊天应用
- 实时数据推送
- 微服务间事件通知
Redis发布订阅是一个轻量级的消息传递方案,适合对消息可靠性要求不高、但需要实时性的场景。如果需要更强的消息可靠性,建议使用Redis Streams或专业消息队列。
以上就是基于SpringBoot+Redis实现一个完整的发布订阅系统的详细内容,更多关于SpringBoot Redis发布订阅功能的资料请关注脚本之家其它相关文章!
