java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot监控所有线程池

SpringBoot监控所有线程池的四种解决方案及代码案例

作者:学亮编程手记

这篇文章介绍了四种监控Spring Boot中所有线程池的解决方案,并提供了代码案例,推荐使用自动发现机制来监控所有线程池,需要的朋友可以参考下

问题分析

1.默认监控的局限性

@Component
public class ThreadPoolMonitor {
    
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor; // 只能监控这一个线程池
    
    public void monitor() {
        // 只能监控 taskExecutor 这个特定的bean
        System.out.println("活跃线程: " + taskExecutor.getActiveCount());
    }
}

解决方案

方案1:手动注册所有线程池

@Component
public class ThreadPoolMonitor {
    
    private final Map<String, ThreadPoolTaskExecutor> executors = new ConcurrentHashMap<>();
    
    // 手动注册线程池
    public void registerExecutor(String name, ThreadPoolTaskExecutor executor) {
        executors.put(name, executor);
    }
    
    @Scheduled(fixedRate = 30000)
    public void monitorAll() {
        executors.forEach((name, executor) -> {
            ThreadPoolExecutor pool = executor.getThreadPoolExecutor();
            log.info("线程池[{}] - 活跃: {}/{}, 队列: {}/{}, 完成: {}",
                name,
                pool.getActiveCount(),
                pool.getPoolSize(),
                pool.getQueue().size(),
                pool.getQueue().remainingCapacity() + pool.getQueue().size(),
                pool.getCompletedTaskCount());
        });
    }
}

// 在配置中注册
@Configuration
public class ExecutorConfig {
    
    @Autowired
    private ThreadPoolMonitor monitor;
    
    @Bean("emailExecutor")
    public ThreadPoolTaskExecutor emailExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 配置...
        executor.initialize();
        
        // 注册到监控器
        monitor.registerExecutor("emailExecutor", executor);
        return executor;
    }
    
    @Bean("smsExecutor")
    public ThreadPoolTaskExecutor smsExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 配置...
        executor.initialize();
        
        monitor.registerExecutor("smsExecutor", executor);
        return executor;
    }
}

方案2:自动发现所有线程池(推荐)

@Component
public class GlobalThreadPoolMonitor {
    
    @Autowired
    private ApplicationContext applicationContext;
    
    @Scheduled(fixedRate = 30000)
    public void monitorAllThreadPools() {
        // 获取所有 ThreadPoolTaskExecutor 类型的bean
        Map<String, ThreadPoolTaskExecutor> executors = 
            applicationContext.getBeansOfType(ThreadPoolTaskExecutor.class);
        
        // 获取所有 ThreadPoolExecutor 类型的bean(直接创建的)
        Map<String, ThreadPoolExecutor> nativeExecutors = 
            applicationContext.getBeansOfType(ThreadPoolExecutor.class);
        
        log.info("=== 线程池监控报告 ===");
        
        // 监控 Spring 封装的线程池
        executors.forEach((beanName, executor) -> {
            if (executor.getThreadPoolExecutor() != null) {
                printPoolStats(beanName, executor.getThreadPoolExecutor());
            }
        });
        
        // 监控原生线程池
        nativeExecutors.forEach((beanName, executor) -> {
            printPoolStats(beanName, executor);
        });
    }
    
    private void printPoolStats(String name, ThreadPoolExecutor executor) {
        log.info("线程池[{}]: 活跃{}/核心{}, 队列{}/{}, 完成任务: {}, 拒绝: {}",
            name,
            executor.getActiveCount(),
            executor.getPoolSize(),
            executor.getQueue().size(),
            executor.getQueue().size() + executor.getQueue().remainingCapacity(),
            executor.getCompletedTaskCount(),
            executor.getRejectedExecutionHandler().getClass().getSimpleName());
    }
}

方案3:监控 @Async 使用的线程池

@Component
public class AsyncThreadPoolMonitor {
    
    @Autowired
    private ApplicationContext applicationContext;
    
    @Scheduled(fixedRate = 30000)
    public void monitorAsyncPools() {
        try {
            // 通过反射获取Spring内部的线程池
            Map<String, Executor> asyncExecutors = 
                applicationContext.getBeansOfType(Executor.class);
            
            asyncExecutors.forEach((name, executor) -> {
                if (executor instanceof ThreadPoolTaskExecutor) {
                    ThreadPoolExecutor pool = ((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor();
                    printAsyncPoolStats(name, pool);
                } else if (executor instanceof ThreadPoolExecutor) {
                    printAsyncPoolStats(name, (ThreadPoolExecutor) executor);
                } else if (executor instanceof TaskExecutor) {
                    log.info("Executor [{}]: 类型 {}", name, executor.getClass().getSimpleName());
                }
            });
        } catch (Exception e) {
            log.warn("监控异步线程池失败: {}", e.getMessage());
        }
    }
    
    private void printAsyncPoolStats(String name, ThreadPoolExecutor pool) {
        double usageRate = pool.getMaximumPoolSize() > 0 ? 
            (double) pool.getActiveCount() / pool.getMaximumPoolSize() * 100 : 0;
            
        log.warn("异步线程池[{}]: 活跃{}/最大{}, 使用率: {:.1f}%, 队列: {}/{}",
            name,
            pool.getActiveCount(),
            pool.getMaximumPoolSize(),
            usageRate,
            pool.getQueue().size(),
            pool.getQueue().size() + pool.getQueue().remainingCapacity());
    }
}

方案4:集成Micrometer监控(生产环境推荐)

@Component
public class MicrometerThreadPoolMonitor {
    
    private final MeterRegistry meterRegistry;
    private final List<ThreadPoolExecutor> monitoredPools = new ArrayList<>();
    
    public MicrometerThreadPoolMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    // 注册要监控的线程池
    public void registerPool(String name, ThreadPoolExecutor pool) {
        monitoredPools.add(pool);
        
        // 注册指标
        Gauge.builder("thread.pool.active.count", pool, ThreadPoolExecutor::getActiveCount)
            .tag("pool", name)
            .description("活跃线程数")
            .register(meterRegistry);
            
        Gauge.builder("thread.pool.queue.size", pool, p -> p.getQueue().size())
            .tag("pool", name)
            .description("队列大小")
            .register(meterRegistry);
            
        Gauge.builder("thread.pool.completed.tasks", pool, ThreadPoolExecutor::getCompletedTaskCount)
            .tag("pool", name)
            .description("完成任务数")
            .register(meterRegistry);
    }
    
    @EventListener
    public void onApplicationReady(ApplicationReadyEvent event) {
        // 应用启动后自动发现并注册所有线程池
        ApplicationContext context = event.getApplicationContext();
        
        Map<String, ThreadPoolTaskExecutor> springExecutors = 
            context.getBeansOfType(ThreadPoolTaskExecutor.class);
        
        Map<String, ThreadPoolExecutor> nativeExecutors = 
            context.getBeansOfType(ThreadPoolExecutor.class);
        
        springExecutors.forEach((name, executor) -> {
            if (executor.getThreadPoolExecutor() != null) {
                registerPool("spring-" + name, executor.getThreadPoolExecutor());
            }
        });
        
        nativeExecutors.forEach((name, executor) -> {
            registerPool("native-" + name, executor);
        });
        
        log.info("已注册监控的线程池数量: {}", monitoredPools.size());
    }
}

完整的生产级监控方案

@Configuration
public class ThreadPoolMonitorConfig {
    
    @Bean
    @ConditionalOnMissingBean
    public GlobalThreadPoolMonitor globalThreadPoolMonitor() {
        return new GlobalThreadPoolMonitor();
    }
}

@Component
@Slf4j
public class GlobalThreadPoolMonitor {
    
    @Autowired
    private ApplicationContext applicationContext;
    
    private final Map<String, ThreadPoolExecutor> allPools = new ConcurrentHashMap<>();
    
    @PostConstruct
    public void init() {
        discoverAllThreadPools();
    }
    
    @Scheduled(fixedRate = 30000)
    public void monitorAllPools() {
        if (allPools.isEmpty()) {
            discoverAllThreadPools();
        }
        
        log.info("======= 线程池监控报告 =======");
        allPools.forEach(this::logPoolStatus);
        log.info("======= 监控报告结束 =======");
    }
    
    private void discoverAllThreadPools() {
        // 发现Spring封装的线程池
        applicationContext.getBeansOfType(ThreadPoolTaskExecutor.class)
            .forEach((name, executor) -> {
                if (executor.getThreadPoolExecutor() != null) {
                    allPools.put("Spring-" + name, executor.getThreadPoolExecutor());
                }
            });
        
        // 发现原生线程池
        applicationContext.getBeansOfType(ThreadPoolExecutor.class)
            .forEach((name, executor) -> {
                allPools.put("Native-" + name, executor);
            });
        
        // 发现所有Executor(包括@Async使用的)
        applicationContext.getBeansOfType(Executor.class)
            .forEach((name, executor) -> {
                if (executor instanceof ThreadPoolTaskExecutor) {
                    ThreadPoolExecutor pool = ((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor();
                    allPools.putIfAbsent("Executor-" + name, pool);
                } else if (executor instanceof ThreadPoolExecutor) {
                    allPools.putIfAbsent("Executor-" + name, (ThreadPoolExecutor) executor);
                }
            });
        
        log.info("发现线程池数量: {}", allPools.size());
    }
    
    private void logPoolStatus(String name, ThreadPoolExecutor pool) {
        int activeCount = pool.getActiveCount();
        int poolSize = pool.getPoolSize();
        int queueSize = pool.getQueue().size();
        int queueCapacity = queueSize + pool.getQueue().remainingCapacity();
        long completedTasks = pool.getCompletedTaskCount();
        
        String status = (activeCount == 0) ? "空闲" : "忙碌";
        double usageRate = pool.getMaximumPoolSize() > 0 ? 
            (double) activeCount / pool.getMaximumPoolSize() * 100 : 0;
            
        if (usageRate > 80) {
            log.warn("🚨 线程池[{}]: {} 活跃{}/最大{} (使用率{:.1f}%), 队列{}/{}", 
                name, status, activeCount, pool.getMaximumPoolSize(), usageRate, 
                queueSize, queueCapacity);
        } else {
            log.info("线程池[{}]: {} 活跃{}/核心{}, 队列{}/{}, 完成: {}", 
                name, status, activeCount, poolSize, queueSize, queueCapacity, completedTasks);
        }
    }
    
    // 获取特定线程池状态
    public ThreadPoolStats getPoolStats(String poolName) {
        ThreadPoolExecutor pool = allPools.get(poolName);
        if (pool != null) {
            return new ThreadPoolStats(
                pool.getActiveCount(),
                pool.getPoolSize(),
                pool.getQueue().size(),
                pool.getQueue().remainingCapacity(),
                pool.getCompletedTaskCount()
            );
        }
        return null;
    }
    
    // 统计类
    @Data
    @AllArgsConstructor
    public static class ThreadPoolStats {
        private int activeCount;
        private int poolSize;
        private int queueSize;
        private int queueRemainingCapacity;
        private long completedTaskCount;
    }
}

总结

关键是要在应用启动后自动发现所有线程池实例,而不是依赖单个注入。

到此这篇关于SpringBoot监控所有线程池的四种解决方案及代码案例的文章就介绍到这了,更多相关SpringBoot监控所有线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文