java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot Redis发布订阅功能

基于SpringBoot+Redis实现一个完整的发布订阅系统

作者:J_liaty

在分布式系统中,消息队列和发布订阅模式是实现服务间解耦、异步通信的重要手段,本文将详细介绍如何在Spring Boot项目中集成Redis,实现一个简单但功能完整的发布订阅系统,需要的朋友可以参考下

前言

在分布式系统中,消息队列和发布订阅模式是实现服务间解耦、异步通信的重要手段。Redis不仅是一个高性能的键值存储系统,还提供了发布订阅(Publish/Subscribe)功能,可以轻松实现消息的发布与订阅。

本文将详细介绍如何在Spring Boot项目中集成Redis,实现一个简单但功能完整的发布订阅系统。

一、环境准备

1.1 Redis发布订阅简介

Redis的发布订阅是一种消息通信模式:

特点

二、项目搭建

2.1 创建Spring Boot项目

使用Spring Initializr创建项目,或者手动创建Maven项目并添加以下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>redis-pubsub-demo</artifactId>
    <version>1.0.0</version>
    <name>Redis Pub/Sub Demo</name>
    <description>Spring Boot + Redis 发布订阅示例</description>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot Data Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Lombok(可选,简化代码) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- Fastjson(JSON序列化) -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <!-- Spring Boot Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2.2 配置Redis连接

src/main/resources/application.yml中配置Redis连接信息:

server:
  port: 8080

spring:
  application:
    name: redis-pubsub-demo

  # Redis配置
  redis:
    host: localhost
    port: 6379
    password:        # 如果有密码则填写
    database: 0
    timeout: 3000

    # Lettuce连接池配置
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        min-idle: 5
        max-wait: 3000ms

# 日志配置
logging:
  level:
    com.example: DEBUG
    org.springframework.data.redis: DEBUG

三、核心实现

3.1 Redis配置类

创建Redis配置类,配置序列化器和消息监听容器:

package com.example.redispubsub.config;

import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer;
import io.lettuce.core.LettuceConnection;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

/**
 * Redis配置类 - 配置Redis连接和序列化
 *
 * @author example
 * @date 2024-02-27
 */
@Configuration
public class RedisConfig {

    /**
     * 配置RedisTemplate
     * 使用Fastjson进行对象序列化,key使用String序列化
     *
     * @param factory Redis连接工厂,Spring自动注入
     * @return 配置好的RedisTemplate实例
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        // 创建RedisTemplate实例
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置Redis连接工厂
        template.setConnectionFactory(factory);

        // ========================================
        // 配置Key的序列化器(使用String序列化)
        // ========================================
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);          // 设置key的序列化器
        template.setHashKeySerializer(stringRedisSerializer);      // 设置Hash中key的序列化器

        // ========================================
        // 配置Value的序列化器(使用Fastjson序列化)
        // ========================================
        GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
        template.setValueSerializer(fastJsonRedisSerializer);      // 设置value的序列化器
        template.setHashValueSerializer(fastJsonRedisSerializer);  // 设置Hash中value的序列化器

        // 执行初始化操作(必须调用)
        template.afterPropertiesSet();
        return template;
    }

    /**
     * 配置Redis连接池(Lettuce)
     * 优化连接池参数以提高性能
     *
     * @return Lettuce客户端配置
     */
    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        // 配置连接池
        GenericObjectPoolConfig<LettuceConnection> poolConfig = new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(20);        // 最大连接数
        poolConfig.setMaxIdle(10);         // 最大空闲连接数
        poolConfig.setMinIdle(5);          // 最小空闲连接数
        poolConfig.setMaxWaitMillis(3000); // 最大等待时间

        // 创建Lettuce客户端配置
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
                .commandTimeout(Duration.ofSeconds(3))  // 命令超时时间
                .shutdownTimeout(Duration.ZERO)          // 关闭超时时间
                .poolConfig(poolConfig)                 // 连接池配置
                .build();

        // 创建连接工厂并返回
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", 6379), clientConfig);
    }
}

3.2 消息实体类

定义消息实体,用于传输消息内容:

package com.example.redispubsub.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * 消息实体类
 *
 * @author example
 * @date 2024-02-27
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 消息ID
     */
    private String messageId;

    /**
     * 消息类型
     */
    private String messageType;

    /**
     * 消息内容
     */
    private String content;

    /**
     * 发送时间
     */
    private LocalDateTime sendTime;

    /**
     * 发送者
     */
    private String sender;

    /**
     * 创建消息
     */
    public static Message create(String messageType, String content, String sender) {
        Message message = new Message();
        message.setMessageId(System.currentTimeMillis() + "");
        message.setMessageType(messageType);
        message.setContent(content);
        message.setSendTime(LocalDateTime.now());
        message.setSender(sender);
        return message;
    }
}

3.3 消息发布者

创建消息发布者服务,用于向指定频道发布消息:

package com.example.redispubsub.publisher;

import com.alibaba.fastjson.JSON;
import com.example.redispubsub.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

/**
 * 消息发布者服务 - 负责向Redis频道发布消息
 *
 * @author example
 * @date 2024-02-27
 */
@Slf4j
@Service
public class MessagePublisher {

    // ========================================
    // 注入RedisTemplate,用于操作Redis
    // 注意:RedisTemplate是线程安全的,可以在多线程环境中共享使用
    // ========================================
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 发布消息到指定频道
     *
     * 工作原理:
     * 1. 使用RedisTemplate的convertAndSend方法发布消息
     * 2. 该方法会自动将消息对象序列化为字节数组
     * 3. 然后将消息发送到Redis服务器的指定频道
     * 4. 所有订阅该频道的订阅者都会收到该消息
     *
     * @param channel 频道名称(字符串)
     * @param message 要发布的消息对象(会被序列化为JSON)
     */
    public void publish(String channel, Message message) {
        try {
            // 记录日志:准备发布消息
            log.info("准备发布消息到频道 [{}]: {}", channel, JSON.toJSONString(message));
            
            // ========================================
            // 核心发布逻辑
            // ========================================
            // convertAndSend是RedisTemplate提供的方法,用于发布消息
            // 它会自动进行对象的序列化和反序列化
            redisTemplate.convertAndSend(channel, message);
            
            // 记录日志:消息发布成功
            log.info("消息发布成功到频道 [{}]", channel);
        } catch (Exception e) {
            // 记录错误日志
            log.error("消息发布失败: {}", e.getMessage(), e);
            // 实际项目中,这里可以添加重试逻辑或告警通知
        }
    }

    /**
     * 发布文本消息(便捷方法)
     * 自动创建Message对象并发布
     *
     * @param channel 频道名称
     * @param content 消息内容
     */
    public void publishText(String channel, String content) {
        // 创建消息对象:类型为TEXT,发送者为SYSTEM
        Message message = Message.create("TEXT", content, "SYSTEM");
        // 调用核心发布方法
        publish(channel, message);
    }
}

3.4 消息订阅者

创建消息订阅者,实现消息监听接口:

package com.example.redispubsub.subscriber;

import com.alibaba.fastjson.JSON;
import com.example.redispubsub.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 聊天消息订阅者 - 实现MessageListener接口,监听chat频道的消息
 *
 * 工作原理:
 * 1. 实现Spring Data Redis的MessageListener接口
 * 2. 当Redis服务器有消息发布到订阅的频道时,会自动调用onMessage方法
 * 3. onMessage方法在独立的线程中执行,不会阻塞主线程
 *
 * @author example
 * @date 2024-02-27
 */
@Slf4j
@Component
public class ChatMessageSubscriber implements MessageListener {

    /**
     * 消息接收回调方法(由Redis消息监听容器自动调用)
     *
     * @param message Redis消息对象,包含频道名和消息体
     * @param pattern 频道匹配模式(如果是模式订阅,则为匹配的模式;否则为null)
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // ========================================
            // 步骤1:解析频道名称
            // ========================================
            String channel = new String(message.getChannel());
            log.debug("收到消息,来自频道: {}", channel);

            // ========================================
            // 步骤2:解析消息体
            // ========================================
            byte[] body = message.getBody();
            // 使用Fastjson反序列化为Message对象
            // 注意:需要确保发布的对象和接收的对象类型一致
            Message msg = JSON.parseObject(body, Message.class);

            // ========================================
            // 步骤3:记录消息详情
            // ========================================
            log.info("收到消息 - 频道: {}, 消息ID: {}, 类型: {}, 内容: {}, 发送者: {}, 时间: {}",
                    channel,
                    msg.getMessageId(),
                    msg.getMessageType(),
                    msg.getContent(),
                    msg.getSender(),
                    msg.getSendTime());

            // ========================================
            // 步骤4:处理业务逻辑
            // ========================================
            handleMessage(msg);

        } catch (Exception e) {
            // 异常处理:记录错误日志,避免因单个消息处理失败影响后续消息接收
            log.error("消息处理失败: {}", e.getMessage(), e);
            // 实际项目中,可以考虑将失败的消息记录到死信队列或数据库,以便后续重试
        }
    }

    /**
     * 处理消息业务逻辑
     * 根据不同的消息类型执行不同的业务处理
     *
     * @param message 消息对象
     */
    private void handleMessage(Message message) {
        // 根据消息类型进行分支处理
        switch (message.getMessageType()) {
            case "TEXT":
                // 处理文本消息:记录日志、保存到数据库等
                log.info("处理文本消息: {}", message.getContent());
                // TODO: 添加具体的业务逻辑,例如保存聊天记录
                break;

            case "ORDER":
                // 处理订单消息:更新订单状态、发送通知等
                log.info("处理订单消息: {}", message.getContent());
                // TODO: 添加订单相关的业务逻辑
                break;

            case "NOTIFICATION":
                // 处理通知消息:推送到前端、发送邮件等
                log.info("处理通知消息: {}", message.getContent());
                // TODO: 添加通知相关的业务逻辑
                break;

            default:
                // 未知消息类型:记录警告日志
                log.warn("未知消息类型: {}, 内容: {}", message.getMessageType(), message.getContent());
        }
    }
}

3.5 消息监听器配置

创建监听器配置类,将订阅者注册到监听容器中:

package com.example.redispubsub.config;

import com.example.redispubsub.subscriber.ChatMessageSubscriber;
import com.example.redispubsub.subscriber.OrderMessageSubscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * 消息监听器配置类 - 负责注册订阅者和频道的绑定关系
 *
 * @author example
 * @date 2024-02-27
 */
@Configuration
public class MessageListenerConfig {

    // ========================================
    // 注入订阅者实例(由Spring容器管理)
    // ========================================
    @Autowired
    private ChatMessageSubscriber chatMessageSubscriber;      // 聊天消息订阅者

    @Autowired(required = false)
    private OrderMessageSubscriber orderMessageSubscriber;    // 订单消息订阅者(可选)

    @Autowired(required = false)
    private NewsMessageSubscriber newsMessageSubscriber;      // 新闻消息订阅者(可选)

    /**
     * 配置Redis消息监听容器
     * 
     * 功能说明:
     * 1. RedisMessageListenerContainer是Spring Data Redis提供的消息监听容器
     * 2. 它会自动管理与Redis的连接,并启动独立的线程来接收消息
     * 3. 在容器启动时,会自动注册所有订阅者到对应的频道
     * 4. 当Redis服务器有消息发布到已订阅的频道时,容器会立即接收并调用对应的订阅者
     *
     * @param connectionFactory Redis连接工厂,由Spring自动注入
     * @return 配置好的消息监听容器
     */
    @Bean
    public RedisMessageListenerContainer messageListenerContainer(
            RedisConnectionFactory connectionFactory) {

        // ========================================
        // 创建消息监听容器
        // ========================================
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // ========================================
        // 注册订阅者到对应的频道
        // ========================================
        
        // 注册聊天消息订阅者 - 监听"chat"频道
        // PatternTopic: 支持通配符模式的频道订阅,例如 "chat.*" 会匹配所有以chat.开头的频道
        container.addMessageListener(chatMessageSubscriber, new PatternTopic("chat"));
        System.out.println("[Redis Pub/Sub] 已注册订阅者: ChatMessageSubscriber -> 频道: chat");

        // 注册订单消息订阅者 - 监听"order"频道(如果存在)
        if (orderMessageSubscriber != null) {
            container.addMessageListener(orderMessageSubscriber, new PatternTopic("order"));
            System.out.println("[Redis Pub/Sub] 已注册订阅者: OrderMessageSubscriber -> 频道: order");
        }

        // 注册新闻消息订阅者 - 使用模式订阅,监听所有"news.*"开头的频道(如果存在)
        if (newsMessageSubscriber != null) {
            // 模式订阅示例:news.sports、news.tech、news.weather 都会被监听到
            container.addMessageListener(newsMessageSubscriber, new PatternTopic("news.*"));
            System.out.println("[Redis Pub/Sub] 已注册订阅者: NewsMessageSubscriber -> 模式: news.*");
        }

        // ========================================
        // 可选:设置线程池(提升性能)
        // ========================================
        // 如果需要自定义线程池,可以取消下面注释
        /*
        container.setTaskExecutor(Executors.newFixedThreadPool(10));
        */

        return container;
    }
}

四、创建测试接口

4.1 发布消息接口

创建Controller,提供HTTP接口用于发布消息:

package com.example.redispubsub.controller;

import com.example.redispubsub.model.Message;
import com.example.redispubsub.publisher.MessagePublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

/**
 * 消息发布控制器
 *
 * @author example
 * @date 2024-02-27
 */
@Slf4j
@RestController
@RequestMapping("/api/message")
public class MessageController {

    @Autowired
    private MessagePublisher messagePublisher;

    /**
     * 发布文本消息到chat频道
     *
     * @param content 消息内容
     * @return 发布结果
     */
    @PostMapping("/publish")
    public Map<String, Object> publishMessage(@RequestParam String content) {
        Map<String, Object> result = new HashMap<>();

        try {
            messagePublisher.publishText("chat", content);
            result.put("code", 200);
            result.put("message", "消息发布成功");
            result.put("data", content);
        } catch (Exception e) {
            log.error("消息发布失败", e);
            result.put("code", 500);
            result.put("message", "消息发布失败: " + e.getMessage());
        }

        return result;
    }

    /**
     * 发布自定义消息
     *
     * @param message 消息对象
     * @return 发布结果
     */
    @PostMapping("/publish/custom")
    public Map<String, Object> publishCustomMessage(@RequestBody Message message) {
        Map<String, Object> result = new HashMap<>();

        try {
            messagePublisher.publish("chat", message);
            result.put("code", 200);
            result.put("message", "消息发布成功");
            result.put("data", message);
        } catch (Exception e) {
            log.error("消息发布失败", e);
            result.put("code", 500);
            result.put("message", "消息发布失败: " + e.getMessage());
        }

        return result;
    }

    /**
     * 向指定频道发布消息
     *
     * @param channel 频道名称
     * @param content 消息内容
     * @return 发布结果
     */
    @PostMapping("/publish/{channel}")
    public Map<String, Object> publishToChannel(
            @PathVariable String channel,
            @RequestParam String content) {

        Map<String, Object> result = new HashMap<>();

        try {
            messagePublisher.publishText(channel, content);
            result.put("code", 200);
            result.put("message", "消息发布到频道 [" + channel + "] 成功");
            result.put("data", content);
        } catch (Exception e) {
            log.error("消息发布失败", e);
            result.put("code", 500);
            result.put("message", "消息发布失败: " + e.getMessage());
        }

        return result;
    }
}

五、启动类

创建Spring Boot启动类:

package com.example.redispubsub;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Redis Pub/Sub Demo 启动类
 *
 * @author example
 * @date 2024-02-27
 */
@SpringBootApplication
public class RedisPubSubApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisPubSubApplication.class, args);
        System.out.println("========================================");
        System.out.println("Redis Pub/Sub Demo 启动成功!");
        System.out.println("访问地址: http://localhost:8080");
        System.out.println("========================================");
    }
}

六、测试验证

6.1 启动Redis服务

确保Redis服务已启动,可以使用以下命令检查:

# 检查Redis服务状态
redis-cli ping

# 如果返回 PONG,说明Redis服务正常

6.2 启动应用

运行RedisPubSubApplicationmain方法,启动Spring Boot应用。

6.3 测试发布消息

方式1:使用curl命令

# 发布简单文本消息
curl -X POST "http://localhost:8080/api/message/publish?content=Hello%20Redis%20Pub/Sub"

# 发布到指定频道
curl -X POST "http://localhost:8080/api/message/publish/order?content=订单创建成功"

方式2:使用Postman

请求1:发布简单文本消息

请求2:发布自定义消息

{
  "messageId": "1001",
  "messageType": "ORDER",
  "content": "订单号: ORDER-20240227-001",
  "sender": "ORDER_SERVICE",
  "sendTime": "2024-02-27T10:30:00"
}

6.4 查看日志输出

应用启动后,当有消息发布到订阅的频道时,控制台会输出类似以下日志:

收到消息 - 频道: chat, 消息ID: 1709025600000, 类型: TEXT, 内容: 测试消息, 发送者: SYSTEM, 时间: 2024-02-27T10:30:00
处理文本消息: 测试消息

七、进阶功能

7.1 多订阅者示例

创建多个订阅者,订阅不同的频道:

package com.example.redispubsub.subscriber;

import com.alibaba.fastjson.JSON;
import com.example.redispubsub.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 订单消息订阅者 - 监听order频道的订单相关消息
 * 
 * 实际应用场景:
 * - 当订单创建、支付、发货时,发布消息到order频道
 * - 本订阅者接收到订单消息后,进行相应的业务处理
 * 
 * @author example
 * @date 2024-02-27
 */
@Slf4j
@Component
public class OrderMessageSubscriber implements MessageListener {

    /**
     * 订单消息接收回调方法
     * 
     * @param message Redis消息对象
     * @param pattern 频道匹配模式
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // 获取频道名称
            String channel = new String(message.getChannel());
            
            // 反序列化消息对象
            Message msg = JSON.parseObject(message.getBody(), Message.class);

            log.info("[订单订阅者] 收到订单消息 - 频道: {}, 消息ID: {}, 内容: {}",
                    channel, msg.getMessageId(), msg.getContent());

            // ========================================
            // 订单业务处理逻辑
            // ========================================
            handleOrderMessage(msg);

        } catch (Exception e) {
            log.error("[订单订阅者] 消息处理失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 处理订单业务逻辑
     * 
     * @param message 订单消息
     */
    private void handleOrderMessage(Message message) {
        String content = message.getContent();
        
        // 根据订单内容判断订单类型(实际项目中可以定义更复杂的消息结构)
        if (content.contains("创建")) {
            log.info("[订单订阅者] 处理订单创建: {}", content);
            // TODO: 1. 订单数据入库
            // TODO: 2. 发送订单创建通知
            // TODO: 3. 触发库存扣减逻辑
        } else if (content.contains("支付")) {
            log.info("[订单订阅者] 处理订单支付: {}", content);
            // TODO: 1. 更新订单状态为已支付
            // TODO: 2. 触发发货流程
            // TODO: 3. 发送支付成功通知
        } else if (content.contains("发货")) {
            log.info("[订单订阅者] 处理订单发货: {}", content);
            // TODO: 1. 更新订单状态为已发货
            // TODO: 2. 发送物流信息给客户
        }
    }
}

在配置类中注册:

@Configuration
public class MessageListenerConfig {

    @Autowired
    private ChatMessageSubscriber chatMessageSubscriber;

    @Autowired
    private OrderMessageSubscriber orderMessageSubscriber;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 订阅chat频道
        container.addMessageListener(chatMessageSubscriber, new PatternTopic("chat"));

        // 订阅order频道
        container.addMessageListener(orderMessageSubscriber, new PatternTopic("order"));

        return container;
    }
}

7.2 模式订阅

使用模式订阅,订阅匹配的多个频道:

/**
 * 新闻消息订阅者 - 使用模式订阅
 */
@Slf4j
@Component
public class NewsMessageSubscriber implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        Message msg = JSON.parseObject(message.getBody(), Message.class);

        log.info("[新闻订阅者] 频道: {}, 新闻: {}", channel, msg.getContent());
    }
}

配置模式订阅:

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
        RedisConnectionFactory connectionFactory) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);

    // 模式订阅:订阅所有news.*开头的频道
    container.addMessageListener(newsMessageSubscriber, new PatternTopic("news.*"));

    return container;
}

八、注意事项

8.1 Redis发布订阅的局限性

消息不持久化

无消息确认机制

扩展性限制

8.2 生产环境建议

如果需要消息可靠性和持久化,建议使用以下方案:

Redis Streams(Redis 5.0+)

专业消息队列

8.3 性能优化

合理设置连接池

消息序列化优化

批量处理

九、完整项目结构

redis-pubsub-demo/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/example/redispubsub/
│   │   │       ├── config/
│   │   │       │   ├── MessageListenerConfig.java
│   │   │       │   └── RedisConfig.java
│   │   │       ├── controller/
│   │   │       │   └── MessageController.java
│   │   │       ├── model/
│   │   │       │   └── Message.java
│   │   │       ├── publisher/
│   │   │       │   └── MessagePublisher.java
│   │   │       ├── subscriber/
│   │   │       │   ├── ChatMessageSubscriber.java
│   │   │       │   ├── OrderMessageSubscriber.java
│   │   │       │   └── NewsMessageSubscriber.java
│   │   │       └── RedisPubSubApplication.java
│   │   └── resources/
│   │       └── application.yml
│   └── test/
│       └── java/
└── pom.xml

十、总结

本文详细介绍了在Spring Boot中集成Redis实现发布订阅功能的完整流程,包括:

  1. 项目搭建:Maven依赖配置、Redis连接配置
  2. 核心实现:Redis配置类、消息实体、发布者、订阅者
  3. 测试验证:启动应用、发布消息、查看日志
  4. 进阶功能:多订阅者、模式订阅
  5. 注意事项:局限性和生产环境建议

适用场景

Redis发布订阅是一个轻量级的消息传递方案,适合对消息可靠性要求不高、但需要实时性的场景。如果需要更强的消息可靠性,建议使用Redis Streams或专业消息队列。

以上就是基于SpringBoot+Redis实现一个完整的发布订阅系统的详细内容,更多关于SpringBoot Redis发布订阅功能的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文