java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot Redis消息队列

SpringBoot集成Redis消息队列的实现示例

作者:sjsjsbbsbsn

本文主要介绍了SpringBoot集成Redis消息队列的实现示例,包括配置和消费逻辑,RedisStream提供了高吞吐量、顺序消费和消费组机制等优势,具有一定的参考价值,感兴趣的可以了解一下

一.Redis Stream 消息队列模版配置类

/**
 * Redis Stream 消息队列配置
 */
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {

    private static final Logger log = LoggerFactory.getLogger(RedisStreamConfiguration.class);
    private final RedisConnectionFactory redisConnectionFactory;
    private final Consumer1 Consumer1;
    private final Consumer2 Consumer2;

    // 定义需要自定义的配置常量
    private static final int BATCH_SIZE = 10; // 每次批量拉取的消息数量
    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(3); // 拉取消息的阻塞超时时间
    private static final String THREAD_NAME_PREFIX = "your-business"; // 线程名称前缀
    private static final String GROUP_NAME_1 = "group1"; // 第一个消费者组名称
    private static final String GROUP_NAME_2 = "group2"; // 第二个消费者组名称
    private static final String CONSUMER_NAME_1 = "consumer1"; // 第一个消费者名称
    private static final String CONSUMER_NAME_2 = "consumer2"; // 第二个消费者名称
    private static final String STREAM_TOPIC_KEY = SHORT_LINK_STATS_STREAM_TOPIC_KEY; // Stream的主题键

    @Bean
    public ExecutorService asyncStreamConsumer() {
        log.info("Redis Stream 消息队列配置线程池");
        AtomicInteger index = new AtomicInteger();
        int processors = Runtime.getRuntime().availableProcessors();

        // 创建一个自定义线程池
        return new ThreadPoolExecutor(
                processors,
                processors + (processors >> 1),
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                runnable -> {
                    Thread thread = new Thread(runnable);
                    thread.setName(THREAD_NAME_PREFIX + "_" + index.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }
        );
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
            ExecutorService asyncStreamConsumer) {

        // 配置 StreamMessageListenerContainer 容器选项
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(BATCH_SIZE) // 批量拉取消息数量
                        .executor(asyncStreamConsumer) // 使用配置好的线程池
                        .pollTimeout(POLL_TIMEOUT) // 拉取消息的超时时间
                        .build();

        // 创建 StreamMessageListenerContainer 实例
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 配置第一个消息监听器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_1, CONSUMER_NAME_1), // 指定第一个消费者组和消费者名称
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主题和偏移量
                Consumer1 // 指定第一个消息处理逻辑
        );

        // 配置第二个消息监听器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_2, CONSUMER_NAME_2), // 指定第二个消费者组和消费者名称
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主题和偏移量
                Consumer2 // 指定第二个消息处理逻辑
        );

        return container;
    }
}

1. 介绍

RedisStreamConfiguration 是一个用于配置 Redis Stream 消息队列的 Spring 配置类。它通过 Redis Stream 实现消息的异步处理和多消费者消费,适用于需要高吞吐量、低延迟的业务场景。

2. 关键组件和自定义参数

此类主要配置了 Redis Stream 消息监听容器 StreamMessageListenerContainer,包括线程池配置、消费批次和超时时间等,方便用户根据业务需求自定义。

核心参数

代码实现

配置了 StreamMessageListenerContainer 来处理 Stream 消息,并分别为两个消费者组和消费者注册不同的监听器。

3. 主要方法说明

ExecutorService(线程池配置)

@Bean
public ExecutorService asyncStreamConsumer() { ... }

用于创建一个自定义线程池,为 Redis Stream 的消息消费提供异步执行环境。processors 设置了核心线程数为 CPU 核心数,最大线程数为 processors + (processors >> 1),即核心数的 1.5 倍。线程命名使用 THREAD_NAME_PREFIX 前缀,方便日志记录和排查问题。

StreamMessageListenerContainer(消息监听容器)

@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(...) { ... }

该方法创建并配置了 Redis Stream 的监听容器。关键步骤如下:

4. 应用场景

此配置适用于 Redis Stream 在大规模并发场景下的消息队列管理。通过灵活配置多个消费者组和消费者,可以实现负载均衡的多线程消费逻辑。

二.消费者模版

/**
 * 消息队列消费者
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> {
   
    private final RedissonClient redissonClient;
    private final StringRedisTemplate stringRedisTemplate;
    private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
            // 判断当前的这个消息流程是否执行完成
            if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
                return;
            }
            throw new ServiceException("消息未完成流程,需要消息队列重试");
        }
        try {
            Map<String, String> producerMap = message.getValue();
         	//你自己的业务逻辑
            }
            // 删除消息
            stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
        } catch (Throwable ex) {
            messageQueueIdempotentHandler.delMessageProcessed(id.toString());
            log.error("消费异常", ex);
            throw ex;
        }
        //消费完删除
        messageQueueIdempotentHandler.setAccomplish(id.toString());
    }

   
}

本模板实现了一个 Redis Stream 消息队列消费者的基础结构。该模板主要围绕幂等性检查、消息解析与处理以及消费状态管理三个核心功能,确保消息在高并发环境下的安全性与一致性。 

具体的幂等校验看我另一篇文章

三.生产者模版

/**
 * 短链接监控状态保存消息队列生产者
 */
@Component
@RequiredArgsConstructor
public class Producer implements MessageQueueProducer{

    private final StringRedisTemplate stringRedisTemplate;


    /**
     * 发送消息
     */
    public void send(Map<String, String> producerMap) {
        stringRedisTemplate.opsForStream().add(YOUR_KEY, producerMap);
    }
}

注意YOUR_KEY 替换成你自己的即可

四.总结

1. Redis Stream 消息队列的优势

Redis Stream 是 Redis 提供的一种强大的消息队列解决方案,适用于高吞吐量、低延迟的业务场景。与传统的消息队列系统(如 RabbitMQ 或 Kafka)相比,Redis Stream 在集成与配置方面更加简单,尤其适合基于 Redis 的应用程序。Redis Stream 提供了以下优势:

2. Redis Stream 配置与应用

本文介绍了如何在 Spring Boot 中集成 Redis Stream 消息队列的配置与消费逻辑,主要包括:

3. 消费者与生产者模板

4. 应用场景

Redis Stream 适用于许多场景,特别是需要高并发、高吞吐量且保证顺序消费的业务需求。例如,短链接生成与访问统计、订单处理、日志收集等业务场景,都能通过 Redis Stream 实现高效、可靠的消息队列。

到此这篇关于SpringBoot集成Redis消息队列的实现示例的文章就介绍到这了,更多相关SpringBoot Redis消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文