java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot Redis Stream轻量消息队列

SpringBoot使用Redis Stream实现轻量消息队列的示例代码

作者:少年酱105974

Redis Stream 是 Redis 5.0 引入的一种数据结构,用于处理日志类型的数据,它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等,本文介绍了SpringBoot使用Redis Stream实现轻量消息队列,需要的朋友可以参考下

引言

Redis Stream 是 Redis 5.0 引入的一种数据结构,用于处理日志类型的数据。它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等。其设计灵感源于 Kafka 和类似的消息队列系统,且完全集成在 Redis 中,利用了 Redis 的高性能和持久化特性。

依赖

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-data-redis</artifactId>  
</dependency>

说明:此部分定义了 Redis 相关的依赖,确保项目能够引入并使用 Spring Boot 提供的 Redis 启动器。

RedisTemplate 配置

package com.mjg.config;  

import com.fasterxml.jackson.annotation.JsonAutoDetect;  
import com.fasterxml.jackson.annotation.PropertyAccessor;  
import com.fasterxml.jackson.databind.ObjectMapper;  
import com.fasterxml.jackson.databind.SerializationFeature;  
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.data.redis.connection.RedisConnectionFactory;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;  
import org.springframework.data.redis.serializer.StringRedisSerializer;  

@Configuration  
public class RedisConfig {  

    @Bean  
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {  
        RedisTemplate<String, Object> template = new RedisTemplate<>();  
        template.setConnectionFactory(connectionFactory);  
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);  
        ObjectMapper om = new ObjectMapper();  
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);  
//        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);  
        // 注册 Java 8 日期时间模块  
        om.registerModule(new JavaTimeModule());  
        om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);  
        om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);  
        jackson2JsonRedisSerializer.serialize(om);  
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();  
        // key 采用 String 的序列化方式  
        template.setKeySerializer(stringRedisSerializer);  
        // hash 的 key 也采用 String 的序列化方式  
        template.setHashKeySerializer(stringRedisSerializer);  
        // value 序列化方式采用 jackson  
        template.setValueSerializer(jackson2JsonRedisSerializer);  
        // hash 的 value 序列化方式采用 jackson  
        template.setHashValueSerializer(jackson2JsonRedisSerializer);  
        template.afterPropertiesSet();  
        return template;  
    }  
}

说明:此配置类用于设置 RedisTemplate 的序列化方式,以满足不同数据类型的存储和读取需求。

RedisStreamConfig

package com.mjg.config;  

import cn.hutool.core.convert.Convert;  
import cn.hutool.core.util.StrUtil;  
import lombok.RequiredArgsConstructor;  
import lombok.SneakyThrows;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.beans.factory.DisposableBean;  
import org.springframework.beans.factory.InitializingBean;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.data.redis.connection.RedisConnectionFactory;  
import org.springframework.data.redis.connection.RedisServerCommands;  
import org.springframework.data.redis.connection.stream.Consumer;  
import org.springframework.data.redis.connection.stream.ObjectRecord;  
import org.springframework.data.redis.connection.stream.ReadOffset;  
import org.springframework.data.redis.connection.stream.StreamOffset;  
import org.springframework.data.redis.core.RedisCallback;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.core.StreamOperations;  
import org.springframework.data.redis.stream.StreamListener;  
import org.springframework.data.redis.stream.StreamMessageListenerContainer;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  
import org.springframework.util.Assert;  

import java.net.InetAddress;  
import java.time.Duration;  
import java.util.Properties;  

@Slf4j  
@RequiredArgsConstructor  
@Configuration  
public class RedisStreamConfig implements InitializingBean, DisposableBean {  
    private final RedisTemplate<String, Object> redisTemplate;  

    public static String streamName = "user-event-stream";  
    public static String userEventGroup = "user-event-group";  
    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;  

    /**  
     * 消息侦听器容器,用于监听 Redis Stream 中的消息  
     *  
     * @param connectionFactory Redis 连接工厂,用于创建 Redis 连接  
     * @param messageConsumer   消息消费者,用于处理接收到的消息  
     * @return 返回 {@link StreamMessageListenerContainer}<{@link String}, {@link ObjectRecord}<{@link String}, {@link String}>> 类型的消息侦听器容器  
     */  
    @Bean  
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> messageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {  
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = streamContainer(streamName, connectionFactory, messageConsumer);  
        listenerContainer.start();  
        return listenerContainer;  
    }  

    /**  
     * 创建一个流容器,用于监听 Redis Stream 中的数据  
     *  
     * @param streamName        Redis Stream 的名称  
     * @param connectionFactory Redis 连接工厂  
     * @param streamListener    绑定的监听类  
     * @return 返回 StreamMessageListenerContainer 对象  
     */  
    @SneakyThrows  
    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String streamName, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) {  
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =  
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions  
                      .builder()  
                      .pollTimeout(Duration.ofSeconds(5)) // 拉取消息超时时间  
                      .batchSize(10) // 批量抓取消息  
                      .targetType(String.class) // 传递的数据类型  
                      .executor(threadPoolTaskExecutor)  
                      .build();  
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer  
              .create(connectionFactory, options);  
        // 指定消费最新的消息  
        StreamOffset<String> offset = StreamOffset.create(streamName, ReadOffset.lastConsumed());  
        // 创建消费者  
        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = buildStreamReadRequest(offset, streamListener);  
        // 指定消费者对象  
        container.register(streamReadRequest, streamListener);  
        return container;  
    }  

    /**  
     * 生成流读取请求  
     *  
     * @param offset         偏移量,用于指定从 Redis Stream 中的哪个位置开始读取消息  
     * @param streamListener 流侦听器,用于处理接收到的消息  
     * @return 返回一个 StreamReadRequest 对象,表示一个流读取请求  
     * @throws Exception 当 streamListener 无法识别为 MessageConsumer 类型时,抛出异常  
     */  
    private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception {  
        Consumer consumer;  
        if (streamListener instanceof MessageConsumer) {  
            consumer = Consumer.from(userEventGroup, InetAddress.getLocalHost().getHostName());  
        } else {  
            throw new Exception("无法识别的 stream key");  
        }  
        // 关闭自动 ack 确认  
        return StreamMessageListenerContainer.StreamReadRequest.builder(offset)  
              .errorHandler((error) -> {  
                    log.error(error.getMessage());  
                })  
              .cancelOnError(e -> false)  
              .consumer(consumer)  
                // 关闭自动 ack 确认  
              .autoAcknowledge(false)  
              .build();  
    }  

    /**  
     * 检查 Redis 版本是否符合要求  
     *  
     * @throws IllegalStateException 如果 Redis 版本小于 5.0.0 版本,抛出该异常  
     */  
    private void checkRedisVersion() {  
        // 获得 Redis 版本  
        Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);  
        Assert.notNull(info, "Redis info is null");  
        Object redisVersion = info.get("redis_version");  
        Integer anInt = Convert.toInt(redisVersion);  
        if (anInt < 5) {  
            throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", redisVersion));  
        }  
    }  

    @Override  
    public void destroy() throws Exception {  
    }  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        checkRedisVersion();  
        StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();  
        if (Boolean.FALSE.equals(redisTemplate.hasKey(streamName))) {  
            streamOperations.createGroup(streamName, ReadOffset.from("0"), userEventGroup);  
        }  
    }  
}

说明:该配置类实现了对 Redis Stream 的相关配置,包括消息监听容器的创建、流读取请求的生成、Redis 版本的检查以及组的创建等功能。

生产者

package com.mjg.config;  

import lombok.RequiredArgsConstructor;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.data.redis.connection.stream.RecordId;  
import org.springframework.data.redis.connection.stream.StreamRecords;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.stereotype.Component;  

import java.util.Collections;  

@Component  
@RequiredArgsConstructor  
@Slf4j  
public class MessageProducer {  

    private final RedisTemplate<String, Object> redisTemplate;  

    public void sendMessage(String streamKey, Object message) {  
        RecordId recordId = redisTemplate  
              .opsForStream().add(StreamRecords.newRecord()  
                      .ofMap(Collections.singletonMap("data", message))  
                      .withStreamKey(streamKey));  
        if (recordId!= null) {  
            log.info("Message sent to Stream '{}' with RecordId: {}", streamKey, recordId);  
        }  
    }  
}

说明:MessageProducer 类负责向 Redis Stream 发送消息。

消费者

package com.mjg.config;  

import lombok.RequiredArgsConstructor;  
import org.springframework.data.redis.connection.stream.ObjectRecord;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.stream.StreamListener;  
import org.springframework.stereotype.Component;  

@RequiredArgsConstructor  
@Component  
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {  

    private final RedisTemplate<String, Object> redisTemplate;  

    @Override  
    public void onMessage(ObjectRecord<String, String> message) {  
        String stream = message.getStream();  
        String messageId = message.getId().toString();  
        String messageBody = message.getValue();  

        System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);  
        System.out.println("Message body: " + messageBody);  

//        消息应答  
        redisTemplate.opsForStream().acknowledge(RedisStreamConfig.streamName, RedisStreamConfig.userEventGroup, message.getId());  
    }  
}

说明:MessageConsumer 类实现了 StreamListener 接口,用于处理从 Redis Stream 接收到的消息,并进行相应的应答操作。

测试

@RequiredArgsConstructor  
@Slf4j  
@RestController  
public class MessageController {  
    public static String streamName = "user-event-stream";  
    private final MessageProducer messageProducer;  

    @GetMapping("/send")  
    public void send() {  
        messageProducer.sendMessage(streamName, "hello 啦啦啦啦" + LocalDateTime.now());  
    }  
}

说明:MessageController 类中的 send 方法通过调用 MessageProducer 来发送消息到指定的 Redis Stream 中。

以上就是SpringBoot使用Redis Stream实现轻量消息队列的示例代码的详细内容,更多关于SpringBoot Redis Stream轻量消息队列的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文