Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis管道操作pipeline

Redis中管道操作pipeline的实现

作者:烟火缠过客

RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操作pipeline的实现,具有一定的参考价值,感兴趣的可以了解一下

什么是pipeline

在 Redis 中,Pipeline(管道)是一种客户端与服务器间通信的优化机制,旨在减少网络往返时间和提高命令执行效率。以下是 Redis Pipeline 的具体定义和特点:

1.批量发送与接收:

2.异步执行

3.命令隔离

4.使用场景

5.注意事项

总结来说,Redis Pipeline 是一种客户端与服务器间高效通信的技术,通过批量发送和接收命令,减少网络往返次数,提高命令执行效率,尤其适用于大量命令操作的场景。在使用时需注意命令打包大小的控制以及错误处理。

场景一:我要向redis新增大批量的数据

Redis Pipeline允许一次性发送多个命令到Redis服务器,而无需等待每个命令的响应,显著减少了网络往返时间和潜在的延迟。在Spring Boot应用中,可以使用RedisTemplate的executePipelined()方法实现:

@Autowired
private StringRedisTemplate redisTemplate

public void batchInsertUsersWithPipeline(List<User> users, String keyPrefix, long ttlSeconds) {
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        for (User user : users) {
            String key = generateKey(keyPrefix, user.getId());
            String value = objectMapper.writeValueAsString(user);

            connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes());
        }
        return null;
    });
}    

分批处理

尽管Pipeline提高了效率,但对于千万级数据,一次性发送所有命令可能导致内存溢出或网络阻塞。因此,建议将数据分批处理,每批包含适量的记录(如1000条),逐批发送至Redis:

public void insertUsersInBatches(List<User> users, String keyPrefix, long ttlSeconds, int batchSize) {
    int start = 0;
    while (start < users.size()) {
        int end = Math.min(start + batchSize, users.size());
        List<User> batch = users.subList(start, end);
        batchInsertUsersWithPipeline(batch, keyPrefix, ttlSeconds);
        start = end;
    }
}

batchInsertUsersWithPipeline方法利用Redis Pipeline机制发送批量命令,可以在一定程度上提高插入操作的并发性,减少网络往返时间和整体耗时。然而,Pipeline本身并不能严格保证所有命令同时成功或失败,其主要特性如下:

1.原子性:

2.响应顺序

3.故障处理

综上所述,batchInsertUsersWithPipeline方法不能严格保证所有命令同时成功或失败。在实际使用中,如果需要确保一批数据要么全部成功插入,要么全部失败回滚,可以采取以下策略:

事务( MULTI/EXEC/DISCARD ):

Lua脚本:

batchInsertUsersWithPipeline方法中的connection中各个方法的区别是什么?

1.connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes());

这一行调用了RedisConnection的setEx方法,用于设置一个带有过期时间(Time To Live,TTL)的键值对。参数说明如下:

setEx方法确保在设置键值对的同时为其设定一个过期时间。如果键已经存在,该方法会更新键的值和过期时间。这个操作在Pipeline模式下是原子的,即在同一时刻只有一个setEx命令被执行。

2.connection.multi(); 和 connection.exec();

这两个方法涉及Redis的事务(Transaction)功能。在Pipeline模式下,由于我们希望保持较高的性能,一般不会使用这两个方法。但如果确实需要保证一批命令的原子性,可以使用如下方式:

在您的batchInsertUsersWithPipeline方法中并没有使用multi和exec,因为Pipeline已经提供了高效的批量执行机制,而且这里的目的是提高插入性能,而不是实现严格的事务行为。

综上所述,batchInsertUsersWithPipeline方法中直接使用了setEx方法,利用Pipeline来高效地批量插入带有过期时间的键值对。如果需要实现更严格的事务控制,应考虑使用Redis的事务(MULTI/EXEC)或Lua脚本,但这通常会牺牲一定的性能,并且与Pipeline机制不完全兼容。在实际应用场景中,应根据业务需求权衡选择合适的操作方式。

3.connection.set()和connection.setNx有什么区别

connection.set() 和 connection.setNx() 都是Redis的键值对设置方法,它们的主要区别在于是否存在条件以及对已有键的处理方式:

1.connection.set(key, value)

这是最基础的设置键值对的方法,无论键是否存在,都会直接覆盖(或创建)对应的键值对。参数说明如下:

行为特点:

2.connection.setNx(key, value)

这是带有条件的设置键值对方法,仅当键不存在时才会设置键值对。参数与set()相同:

行为特点

总结来说,connection.set()无条件地设置或更新键值对,而connection.setNx()则是在键不存在时才设置键值对,如果键已存在,则不会执行任何操作。前者适用于常规的键值更新或插入,后者常用于实现锁机制、唯一性检查等场景,确保某个键的值只在首次设置时有效。在您的batchInsertUsersWithPipeline方法中,由于目标是批量插入新数据,所以使用了setEx方法(带有过期时间的set),确保每个用户数据作为一个新的键值对被添加到Redis中。如果您需要在插入前检查键的唯一性,可以考虑使用setNx方法。不过,对于批量插入场景,通常假设数据是新的且键不存在,因此直接使用setEx更为常见。

场景二:大批量删除redis中的数据

public void batchDeleteKeysWithPipeline(List<String> keys) {
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        for (String key : keys) {
            connection.del(key.getBytes());
        }
        return null;
    });
}

场景三:删除redis中千万级别的数据

1.批量删除策略

2.并行处理

3.Redis 客户端优化:

4.监控与故障恢复:

基于Jedis客户端实现

import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

public class RedisDataDeleter {

    private static final int SCAN_BATCH_SIZE = 1000; // 可根据实际情况调整
    private static final String MATCH_PATTERN = "*"; // 匹配所有键

    public void deleteAllKeys(Jedis jedis) {
        ScanParams scanParams = new ScanParams().count(SCAN_BATCH_SIZE).match(MATCH_PATTERN);

        String cursor = "0";
        while (true) {
            ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
            cursor = scanResult.getCursor();

            List<String> keysToDelete = scanResult.getResult();
            if (!keysToDelete.isEmpty()) {
                // 使用 Pipeline 批量删除键
                Pipeline pipeline = jedis.pipelined();
                for (String key : keysToDelete) {
                    pipeline.del(key);
                }
                pipeline.sync(); // 执行批量命令
            }

            if ("0".equals(cursor)) {
                break; // 扫描完成
            }
        }
    }
}

注意

如果条件允许,建议升级到 Redis 6.x 版本,并启用 activedefrag 配置项,有助于在删除大量数据后及时进行碎片整理,保持 Redis 内存的高效利用。同时,监控 Redis 的内存使用情况和碎片率,必要时手动触发 BGREWRITEAOF 或 BGSAVE 操作。

maven

<dependencies>
    <!-- ... 其他依赖 ... -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.7.0</version> <!-- 根据实际版本号调整 -->
    </dependency>
</dependencies>

jedis连接池配置

spring.redis.host=192.168.1.100
spring.redis.port=6379
spring.redis.password=mysecretpassword  # 如果有密码,请填写

# Jedis 连接池配置
spring.redis.jedis.pool.max-active=10
spring.redis.jedis.pool.max-idle=6
spring.redis.jedis.pool.min-idle=2
spring.redis.jedis.pool.max-wait=2000ms

jedisConfig

@Configuration
public class JedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private int port;

    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public JedisPool jedisPool() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-active")));
        poolConfig.setMaxIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-idle")));
        poolConfig.setMinIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.min-idle")));
        poolConfig.setMaxWaitMillis(Long.parseLong(env.getProperty("spring.redis.jedis.pool.max-wait")));

        return new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, password);
    }
}

实现 Redis 数据删除服务

@Service
public class RedisDataDeleterService {

    @Autowired
    private JedisPool jedisPool;

    public void deleteAllKeys() {
        try (Jedis jedis = jedisPool.getResource()) {
            ScanParams scanParams = new ScanParams().match("*").count(1000);

            String cursor = "0";
            while (true) {
                ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
                cursor = scanResult.getCursor();

                List<String> keysToDelete = scanResult.getResult();
                if (!keysToDelete.isEmpty()) {
                    Pipeline pipeline = jedis.pipelined();
                    for (String key : keysToDelete) {
                        pipeline.del(key);
                    }
                    pipeline.sync();
                }

                if ("0".equals(cursor)) {
                    break;
                }
            }
        }
    }
}

调用删除服务

@RestController
@RequestMapping("/redis")
public class RedisController {

    @Autowired
    private RedisDataDeleterService redisDataDeleterService;

    @GetMapping("/delete-all-keys")
    public ResponseEntity<?> deleteAllKeys() {
        redisDataDeleterService.deleteAllKeys();
        return ResponseEntity.ok().build();
    }
}

基于Lettuce

maven

<dependencies>
    <!-- ... 其他依赖 ... -->
    <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>6.2.¼</version> <!-- 根据实际版本号调整 -->
    </dependency>
</dependencies>

配置Lettuce

Spring Boot 自动配置会为 Lettuce 提供连接池支持。在 application.properties 或 application.yml 中配置 Redis 连接信息:

spring.redis.host=192.168.1.100
spring.redis.port=6379
spring.redis.password=mysecretpassword  # 如果有密码,请填写

使用 Lettuce 客户端执行批量删除操作:

@Service
public class RedisDataDeleterService {

    @Autowired
    private RedisConnectionFactory connectionFactory;

    public void deleteAllKeys() {
        RedisAsyncCommands<String, String> asyncCommands = connectionFactory.getConnection().async();

        ScanArgs scanArgs = ScanArgs.Builder.matches("*").count(1000);
        RedisFuture<ScanResult<String>> scanFuture = asyncCommands.scan(ScanCursor.INITIAL, scanArgs);

        AtomicBoolean isRunning = new AtomicBoolean(true);
        AtomicReference<ScanCursor> lastCursor = new AtomicReference<>(ScanCursor.INITIAL);

        // 异步处理扫描结果
        scanFuture.thenAccept(scanResult -> {
            lastCursor.set(scanResult.getCursor());
            List<String> keysToDelete = scanResult.getKeys();
            if (!keysToDelete.isEmpty()) {
                RedisFuture<Long> delFuture = asyncCommands.del(keysToDelete.toArray(new String[0]));
                delFuture.thenAccept(count -> {
                    if (isRunning.get()) {
                        // 如果仍在运行,继续扫描
                        deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning);
                    }
                });
            } else {
                isRunning.set(false);
            }
        });

        // 设置超时时间(可根据实际情况调整)
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(120000); // 2分钟超时
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            isRunning.set(false);
        });
    }

    private void deleteAllKeysRecursive(RedisAsyncCommands<String, String> asyncCommands,
                                       ScanArgs scanArgs,
                                       AtomicReference<ScanCursor> lastCursor,
                                       AtomicBoolean isRunning) {
        if (isRunning.get()) {
            asyncCommands.scan(lastCursor.get(), scanArgs).thenAccept(scanResult -> {
                lastCursor.set(scanResult.getCursor());
                List<String> keysToDelete = scanResult.getKeys();
                if (!keysToDelete.isEmpty()) {
                    asyncCommands.del(keysToDelete.toArray(new String[0])).thenAccept(count -> {
                        if (isRunning.get()) {
                            deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning);
                        }
                    });
                } else {
                    isRunning.set(false);
                }
            });
        }
    }
}

调用

@RestController
@RequestMapping("/redis")
public class RedisController {

    @Autowired
    private RedisDataDeleterService redisDataDeleterService;

    @GetMapping("/delete-all-keys")
    public ResponseEntity<?> deleteAllKeys() {
        redisDataDeleterService.deleteAllKeys();
        return ResponseEntity.ok().build();
    }
}

到此这篇关于Redis中管道操作pipeline的实现的文章就介绍到这了,更多相关Redis管道操作pipeline内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文