java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java线程池拒绝策略

Java线程池拒绝策略场景分析

作者:敖正炀

不同的业务场景对任务丢失的容忍度、响应延迟的要求、系统保护的需求各不相同,本文就来介绍一下Java线程池拒绝策略场景分析,感兴趣的可以了解一下

不同的业务场景对任务丢失的容忍度、响应延迟的要求、系统保护的需求各不相同。下面通过 6 个典型场景,分析如何选择合适的拒绝策略,并给出代码示例和注意事项。

场景1:电商订单支付(核心交易链路)

业务特点

压力情况:大促时瞬间流量激增,线程池可能饱和。

选择策略AbortPolicy + 上层统一捕获异常,进行异步重试或放入死信队列。

理由

代码示例

// 线程池配置
@Bean("paymentExecutor")
public Executor paymentExecutor() {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
        20, 50, 60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(200),
        new NamedThreadFactory("payment"),
        new ThreadPoolExecutor.AbortPolicy()  // 显式抛出异常
    );
    return executor;
}

// 业务调用处
@Service
public class PaymentService {
    @Autowired
    private ThreadPoolExecutor paymentExecutor;
    
    public void processPayment(PaymentRequest request) {
        try {
            paymentExecutor.execute(() -> doPayment(request));
        } catch (RejectedExecutionException e) {
            // 线程池饱和,将任务写入重试队列(如 RocketMQ、Redis 等)
            saveToRetryQueue(request);
            log.warn("Payment task rejected, saved to retry queue. requestId={}", request.getId());
            // 可选:向调用方返回“系统繁忙,稍后重试”的提示
        }
    }
}

监控指标:拒绝次数必须为 0,一旦出现立即告警并扩容。

场景2:秒杀扣库存(瞬时高并发,允许快速失败)

业务特点

压力情况:QPS 从几百瞬间飙升到几十万。

选择策略AbortPolicyDiscardPolicy + 前端友好提示。

理由

代码示例

// 秒杀专用线程池
int maxConcurrency = 200; // 根据压测得出系统能承受的最大并发扣库存操作
ExecutorService seckillExecutor = new ThreadPoolExecutor(
    0, maxConcurrency, 30L, TimeUnit.SECONDS,
    new SynchronousQueue<>(),
    new NamedThreadFactory("seckill"),
    new ThreadPoolExecutor.AbortPolicy()
);
// 秒杀接口
@PostMapping("/seckill")
public Result seckill(Long goodsId, Long userId) {
    try {
        seckillExecutor.execute(() -> {
            // 扣库存、创建订单等核心操作
            inventoryService.decr(goodsId);
            orderService.create(userId, goodsId);
        });
        return Result.success("抢购中,请稍后查看订单");
    } catch (RejectedExecutionException e) {
        // 线程池满,直接返回失败
        return Result.error("很遗憾,您没抢到,下次加油");
    }
}

优化点:可以在拒绝策略中直接记录指标,但无需重试,因为秒杀失败就是最终结果。

场景3:异步发送短信/邮件(非关键通知,允许少量丢失)

业务特点

压力情况:业务高峰时消息量较大,但系统可以接受一定程度的丢弃。

选择策略DiscardOldestPolicyDiscardPolicy + 日志记录。

理由

代码示例

// 通知线程池
ThreadPoolExecutor notifyExecutor = new ThreadPoolExecutor(
    5, 20, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(500),
    new NamedThreadFactory("notify"),
    new ThreadPoolExecutor.DiscardOldestPolicy()
);
// 发送短信
public void sendSms(String phone, String content) {
    notifyExecutor.execute(() -> {
        try {
            smsClient.send(phone, content);
        } catch (Exception e) {
            log.error("Send sms failed, phone={}", phone, e);
            // 可选:记录失败到数据库,由定时任务补偿
        }
    });
}

监控:可以统计丢弃数量,如果丢弃率过高(如 >1%),考虑扩容或优化短信通道。

场景4:日志/审计记录(海量低价值,可丢弃)

业务特点

压力情况:持续高吞吐,磁盘或网络可能成为瓶颈。

选择策略DiscardPolicy(静默丢弃)。

理由

代码示例

ThreadPoolExecutor logExecutor = new ThreadPoolExecutor(
    2, 10, 10L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new NamedThreadFactory("access-log"),
    new ThreadPoolExecutor.DiscardPolicy()
);
// 记录访问日志
public void logAccess(HttpServletRequest request) {
    logExecutor.execute(() -> {
        // 构建日志对象,发送到 Kafka 或写入本地文件
        accessLogService.save(parseLog(request));
    });
}

进阶:可以结合采样,在丢弃时随机记录 1% 的丢弃事件用于监控。

场景5:批量数据导入(任务重,不允许丢失,可接受延迟)

业务特点

压力情况:任务提交可能短时间集中,但总任务量可控。

选择策略CallerRunsPolicy

理由

代码示例

ThreadPoolExecutor importExecutor = new ThreadPoolExecutor(
    4, 8, 5L, TimeUnit.MINUTES,
    new ArrayBlockingQueue<>(10),  // 小队列,让拒绝策略尽快生效
    new NamedThreadFactory("data-import"),
    new ThreadPoolExecutor.CallerRunsPolicy()
);
// 批量提交导入任务
public void importLargeFiles(List<File> files) {
    for (File file : files) {
        importExecutor.execute(() -> importOneFile(file));
    }
    importExecutor.shutdown();
    importExecutor.awaitTermination(1, TimeUnit.HOURS);
}

注意CallerRunsPolicy 可能会导致调用者线程长时间阻塞,如果调用者是定时任务线程,可能影响其他定时任务。可以将调用者线程池也设置得足够健壮。

场景6:与消息队列结合(最终一致性,高可靠性)

业务特点

选择策略:自定义拒绝策略,将任务转发到 RocketMQ、Kafka 等。

理由

代码示例

public class MQBackedRejectedHandler implements RejectedExecutionHandler {
    private final RocketMQTemplate mqTemplate;
    private final String topic;
    public MQBackedRejectedHandler(RocketMQTemplate mqTemplate, String topic) {
        this.mqTemplate = mqTemplate;
        this.topic = topic;
    }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (executor.isShutdown()) {
            return;
        }
        // 将任务序列化后发送到 MQ
        if (r instanceof SerializableTask) {
            mqTemplate.syncSend(topic, ((SerializableTask) r).getPayload());
        } else {
            // 兜底:记录到数据库
            saveToDatabase(r);
        }
    }
}
// 线程池配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10, 50, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new NamedThreadFactory("worker"),
    new MQBackedRejectedHandler(mqTemplate, "rejected-task-topic")
);

注意:发送 MQ 本身也可能失败,需要做好重试和监控。

场景总结表

业务场景推荐拒绝策略理由风险提示
支付/下单(核心交易)AbortPolicy + 上层重试必须明确失败,不能静默丢弃调用方需处理异常
秒杀/抢购(快速失败)AbortPolicy / DiscardPolicy追求低延迟,超出直接拒绝丢弃率可能较高,前端需友好提示
异步通知(短信/邮件)DiscardOldestPolicy保证新消息优先,可少量丢失旧消息可能丢失
日志/审计DiscardPolicy海量数据,可丢失监控丢弃率,避免过高的丢失
批量导入(不允许丢)CallerRunsPolicy由调用者执行,不丢失调用者可能阻塞
高可靠异步任务自定义(转 MQ/DB)削峰填谷,保证最终执行增加系统复杂度

最佳实践建议

  1. 默认不要使用 AbortPolicy 而毫无处理:至少要在业务代码中捕获 RejectedExecutionException,记录日志或触发降级。
  2. 非核心业务优先使用 DiscardPolicy 并记录丢弃次数:用于容量规划。
  3. 所有拒绝策略都应该有监控:通过 Micrometer、Prometheus 暴露 rejected.count 指标。
  4. CallerRunsPolicy 要谨慎评估调用者线程:如果调用者是 Web 请求线程,可能导致请求超时堆积。
  5. 自定义拒绝策略时不要执行过于耗时的操作(如写数据库、发 MQ),否则会加剧线程池的阻塞。

通过结合具体业务场景选择合适的拒绝策略,可以平衡系统稳定性任务可靠性响应延迟三者之间的关系,构建高可用的并发系统。

到此这篇关于Java线程池拒绝策略场景分析的文章就介绍到这了,更多相关Java线程池拒绝策略内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文