Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis 批量发货状态缓存

Redis 在批量发货状态缓存与错误信息暂存中的应用小结

作者:霸道流氓气质

本文详细探讨了Redis在批量发货状态缓存和错误信息暂存中的应用,通过Redis的String和List数据结构,分别解决了发货状态标记和错误信息存储的问题,强调了Redis在高并发场景下的高效性和一致性保障,感兴趣的朋友跟随小编一起看看吧

一、批量发货状态缓存

1.1 解决什么问题

批量发货场景下,用户选择多条待发货单据一键发货,系统需要:

数据库加状态字段虽然也能做,但频繁读写、并发查询性能差。Redis 作为内存缓存,天然适合这种"短生命周期、高频读写"的临时状态管理。

1.2 核心设计思路

用户触发批量发货
    ↓
遍历每一条待发货单据
    ↓
检查 Redis key 是否存在(是否正在处理中)
    ├── 存在 → 跳过,返回"正在处理中"
    └── 不存在 → 写入 Redis key(标记占位) → 执行发货逻辑
                                                 ↓
                                         发货成功 → 删除 key 或设短过期
                                         发货失败 → 删除 key,返回错误

1.3 涉及的 Redis 数据结构和命令

操作Redis 命令说明
标记正在处理SET key value EX ttl NXNX 保证互斥,EX 防死锁
检查是否处理中GET key非空则跳过
处理完成清除DEL key释放标记
批量检查MGET key1 key2 ...一次查多个单据状态

1.4 代码示例(通用)

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class BatchTaskService {
    private final StringRedisTemplate redisTemplate;
    // key 前缀
    private static final String PROCESSING_KEY_PREFIX = "batch:task:processing:";
    // 标记过期时间(防止异常未清除导致永久阻塞)
    private static final long EXPIRE_SECONDS = 300;
    public BatchTaskService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    /**
     * 批量处理任务.
     *
     * @param taskIds 待处理的任务ID列表
     * @return 处理结果
     */
    public BatchResult processBatch(List<String> taskIds) {
        List<String> successList = new ArrayList<>();
        List<String> skippedList = new ArrayList<>();
        List<String> failedList = new ArrayList<>();
        for (String taskId : taskIds) {
            String redisKey = PROCESSING_KEY_PREFIX + taskId;
            // 尝试标记为处理中(SET NX EX:不存在才设置 + 过期时间)
            Boolean acquired = redisTemplate.opsForValue()
                .setIfAbsent(redisKey, "processing", EXPIRE_SECONDS, TimeUnit.SECONDS);
            if (Boolean.FALSE.equals(acquired)) {
                // key 已存在,说明正在被其他线程/实例处理
                skippedList.add(taskId);
                continue;
            }
            try {
                // 执行业务逻辑
                doProcess(taskId);
                successList.add(taskId);
            } catch (Exception e) {
                failedList.add(taskId);
            } finally {
                // 无论成功失败都清除标记,允许后续重试
                redisTemplate.delete(redisKey);
            }
        }
        return new BatchResult(successList, skippedList, failedList);
    }
    /**
     * 查询哪些任务正在处理中(前端轮询用).
     */
    public List<String> listProcessingTasks(List<String> taskIds) {
        List<String> keys = taskIds.stream()
            .map(id -> PROCESSING_KEY_PREFIX + id)
            .collect(Collectors.toList());
        List<String> values = redisTemplate.opsForValue().multiGet(keys);
        List<String> processingIds = new ArrayList<>();
        for (int i = 0; i < taskIds.size(); i++) {
            if (values != null && values.get(i) != null) {
                processingIds.add(taskIds.get(i));
            }
        }
        return processingIds;
    }
    private void doProcess(String taskId) {
        // 具体业务逻辑...
    }
}

1.5 进阶:带进度的批量状态

如果前端需要展示"已处理 3/10"这样的进度,可以用 Redis Hash:

/**
 * 批量任务进度跟踪.
 */
@Service
public class BatchProgressService {
    private final StringRedisTemplate redisTemplate;
    private static final String PROGRESS_KEY_PREFIX = "batch:progress:";
    public BatchProgressService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    /** 初始化批次进度. */
    public String initBatch(List<String> taskIds) {
        String batchId = UUID.randomUUID().toString();
        String progressKey = PROGRESS_KEY_PREFIX + batchId;
        // Hash 结构:field=taskId, value=状态
        Map<String, String> statusMap = taskIds.stream()
            .collect(Collectors.toMap(id -> id, id -> "PENDING"));
        redisTemplate.opsForHash().putAll(progressKey, statusMap);
        redisTemplate.expire(progressKey, 1, TimeUnit.HOURS);
        return batchId;
    }
    /** 更新单个任务状态. */
    public void updateStatus(String batchId, String taskId, String status) {
        String progressKey = PROGRESS_KEY_PREFIX + batchId;
        redisTemplate.opsForHash().put(progressKey, taskId, status);
    }
    /** 查询批次进度. */
    public Map<String, String> getProgress(String batchId) {
        String progressKey = PROGRESS_KEY_PREFIX + batchId;
        Map<Object, Object> entries = redisTemplate.opsForHash().entries(progressKey);
        return entries.entrySet().stream()
            .collect(Collectors.toMap(
                e -> e.getKey().toString(),
                e -> e.getValue().toString()
            ));
    }
}

1.6 关键设计考量

考量点说明
过期时间必须设置,防止应用崩溃后 key 永久残留阻塞后续操作
幂等性加锁失败时不报错而是跳过,配合重试机制实现最终一致
key 设计业务前缀:模块:唯一标识,如 batch:delivery:SO202401010001
清除时机成功/失败都删除(允许重试)或成功后保留短时间(防重复成功回调)
集群环境确保所有实例连同一个 Redis 集群,key 可见性一致

二、错误信息暂存

2.1 解决什么问题

自动发货场景下:

Redis List 结构完美匹配:左进右出、支持批量读取、设置过期自动清理。

2.2 核心设计思路

自动发货失败
    ↓
格式化错误信息(单号 + 客户码 + requestId + 失败原因)
    ↓
RPUSH 写入 Redis List
    ↓
设置 key 1天过期(EXPIRE)
    ↓
定时任务每分钟 LPOP/LRANGE 取出 → 推送钉钉/企微

2.3 涉及的 Redis 数据结构和命令

操作Redis 命令说明
追加错误RPUSH key messageList 尾部追加,O(1)
批量读取LRANGE key 0 -1取所有消息
逐条消费LPOP key从头部弹出一条
批量消费LPOP key count (Redis 6.2+)弹出 N 条
设置过期EXPIRE key seconds自动清理过期数据
查看队列长度LLEN key监控积压量

2.4 代码示例(通用)

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
 * 异步任务错误信息暂存服务.
 * 用于暂存自动任务的失败信息,供定时任务消费后推送告警.
 */
@Service
public class ErrorMessageBufferService {
    private final StringRedisTemplate redisTemplate;
    // Redis key
    private static final String ERROR_BUFFER_KEY = "error:buffer:auto-task";
    // 过期时间(防止无人消费时无限增长)
    private static final long EXPIRE_HOURS = 24;
    // 单次消费最大条数
    private static final int BATCH_SIZE = 50;
    public ErrorMessageBufferService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    /**
     * 写入错误信息.
     *
     * @param taskId    任务标识
     * @param errorMsg  错误原因
     * @param context   上下文信息(如 requestId)
     */
    public void recordError(String taskId, String errorMsg, String context) {
        String timestamp = LocalDateTime.now()
            .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        // 格式化消息内容
        String message = String.format("[%s] 任务:%s, 上下文:%s, 失败原因:%s",
            timestamp, taskId, context, errorMsg);
        try {
            // 追加到 List 尾部
            redisTemplate.opsForList().rightPush(ERROR_BUFFER_KEY, message);
            // 刷新过期时间(每次写入都续期,保证最后一条写入后24小时过期)
            redisTemplate.expire(ERROR_BUFFER_KEY, EXPIRE_HOURS, TimeUnit.HOURS);
        } catch (Exception e) {
            // Redis 操作失败不影响主流程,仅记录日志
            log.warn("错误信息写入Redis失败, taskId={}", taskId, e);
        }
    }
    /**
     * 批量消费错误信息(定时任务调用).
     *
     * @return 本次消费的错误消息列表
     */
    public List<String> consumeErrors() {
        // 先查长度,取 min(length, BATCH_SIZE) 条
        Long length = redisTemplate.opsForList().size(ERROR_BUFFER_KEY);
        if (length == null || length == 0) {
            return List.of();
        }
        int count = (int) Math.min(length, BATCH_SIZE);
        // LRANGE 读取 + LTRIM 删除(两步操作,实际可用 Lua 保证原子性)
        List<String> messages = redisTemplate.opsForList().range(ERROR_BUFFER_KEY, 0, count - 1);
        redisTemplate.opsForList().trim(ERROR_BUFFER_KEY, count, -1);
        return messages;
    }
    /**
     * 查看当前积压的错误数量(监控用).
     */
    public long getPendingCount() {
        Long size = redisTemplate.opsForList().size(ERROR_BUFFER_KEY);
        return size != null ? size : 0;
    }
}

2.5 定时任务消费 + 推送告警

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
 * 定时消费错误缓存并推送告警.
 */
@Component
public class ErrorAlertJob {
    private final ErrorMessageBufferService errorBufferService;
    private final AlertService alertService;
    public ErrorAlertJob(ErrorMessageBufferService errorBufferService,
                         AlertService alertService) {
        this.errorBufferService = errorBufferService;
        this.alertService = alertService;
    }
    /**
     * 每分钟执行一次,消费错误消息并批量推送.
     */
    @Scheduled(fixedDelay = 60000)
    public void consumeAndAlert() {
        List<String> errors = errorBufferService.consumeErrors();
        if (errors.isEmpty()) {
            return;
        }
        // 拼接为一条告警消息
        StringBuilder content = new StringBuilder();
        content.append("⚠️ 自动任务失败告警(").append(errors.size()).append("条)\n\n");
        for (String error : errors) {
            content.append("• ").append(error).append("\n");
        }
        // 推送到钉钉/企微/飞书
        alertService.sendToWebhook(content.toString());
    }
}

2.6 原子消费的 Lua 脚本方案

LRANGE + LTRIM 两步操作在并发消费时可能丢数据或重复消费,用 Lua 脚本保证原子性:

/**
 * 原子性批量弹出 List 元素.
 */
public List<String> atomicBatchPop(String key, int count) {
    String luaScript =
        "local result = redis.call('lrange', KEYS[1], 0, ARGV[1] - 1) " +
        "if #result > 0 then " +
        "  redis.call('ltrim', KEYS[1], #result, -1) " +
        "end " +
        "return result";
    DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class);
    List<String> result = redisTemplate.execute(
        script, Collections.singletonList(key), String.valueOf(count));
    return result != null ? result : List.of();
}

2.7 使用模板(配置化消息格式)

项目中通过数据库/Redis 存储消息模板,运行时动态填充参数:

/**
 * 模板化错误消息.
 */
public class ErrorMessageTemplate {
    /**
     * 根据模板和参数生成最终消息.
     *
     * @param template 模板,如 "任务{1}失败,客户{2},原因:{3}"
     * @param args     参数数组
     * @return 填充后的消息
     */
    public static String format(String template, String[] args) {
        String result = template;
        for (int i = 0; i < args.length; i++) {
            result = result.replace("{" + (i + 1) + "}", args[i]);
        }
        return result;
    }
}
// 使用
String template = configService.getTemplate("auto-delivery-error");
String message = ErrorMessageTemplate.format(template, new String[]{
    orderCode, sellerCode, requestId, exception.getMessage()
});
errorBufferService.recordError(orderCode, message, requestId);

三、两种场景的对比

维度批量状态缓存错误信息暂存
数据结构String(每个任务一个 key)List(所有错误共用一个 key)
生命周期任务处理完即删除定时消费后删除,或自然过期
并发模式写少读多(前端轮询)写多读少(定时消费)
一致性要求强(标记必须准确)弱(少量丢失可接受)
失败影响Redis 写失败可能导致重复处理Redis 写失败仅丢失告警,不影响主流程
典型 key 模式batch:delivery:{orderCode}error:buffer:auto-delivery

四、注意事项汇总

问题解决方案
Redis 宕机时标记丢失状态缓存:降级为数据库 SELECT FOR UPDATE;错误暂存:降级为本地日志
List 无限增长设置 EXPIRE + 消费任务监控积压量告警
并发消费重复单消费者模式,或用 Lua 原子弹出
消息格式变更模板配置化,修改模板不需要改代码和重启
跨机房同步错误暂存场景容忍延迟,可用异步复制;状态缓存需要同机房 Redis

到此这篇关于Redis 在批量发货状态缓存与错误信息暂存中的应用小结的文章就介绍到这了,更多相关Redis 批量发货状态缓存内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文