利用DUCC配置平台实现一个动态化线程池

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。
首页 新闻资讯 行业资讯 利用DUCC配置平台实现一个动态化线程池

作者:京东零售 张宾

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。

setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

c266424504c5dc199536888d8a5fbcd1648fd1.png

setMaximumPoolSize方法:首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。

347a5c8907b7c6c464b830843620eaede89918.png

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类

复制

/** * 动态线程池 * */public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

(2)动态线程池配置定时刷新类

复制

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {/**     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.     */private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();/**     * @param threadPoolBeanName     * @param threadPoolTaskExecutor     */public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);}@Override
    public void afterPropertiesSet() throws Exception {this.refresh();//创建定时任务线程池
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());//延迟1秒执行,每个1分钟check一次
        executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);}private void refresh() {String dynamicThreadPool = "";try {if (DTP_REGISTRY.isEmpty()) {log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");return;}dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);if (StringUtils.isBlank(dynamicThreadPool)) {log.debug("DynamicThreadPool refresh dynamicThreadPool not config");return;}log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {});if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);return;}for (ThreadPoolProperties properties : threadPoolPropertiesList) {doRefresh(properties);}} catch (Exception e) {log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);}}/**     * @param properties     */private void doRefresh(ThreadPoolProperties properties) {if (StringUtils.isBlank(properties.getThreadPoolBeanName())|| properties.getCorePoolSize() < 1|| properties.getMaxPoolSize() < 1|| properties.getMaxPoolSize() < properties.getCorePoolSize()) {log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);return;}DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());if (Objects.isNull(threadPoolTaskExecutor)) {log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());return;}ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())&& Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());return;}if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());}if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());}   
        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);}private class RefreshThreadPoolConfig extends TimerTask {private RefreshThreadPoolConfig() {}@Override
        public void run() {DynamicThreadPoolRefresh.this.refresh();}}}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

  • 42.

  • 43.

  • 44.

  • 45.

  • 46.

  • 47.

  • 48.

  • 49.

  • 50.

  • 51.

  • 52.

  • 53.

  • 54.

  • 55.

  • 56.

  • 57.

  • 58.

  • 59.

  • 60.

  • 61.

  • 62.

  • 63.

  • 64.

  • 65.

  • 66.

  • 67.

  • 68.

  • 69.

  • 70.

  • 71.

  • 72.

  • 73.

  • 74.

  • 75.

  • 76.

  • 77.

  • 78.

  • 79.

  • 80.

  • 81.

  • 82.

  • 83.

  • 84.

  • 85.

  • 86.

  • 87.

  • 88.

  • 89.

  • 90.

  • 91.

  • 92.

  • 93.

  • 94.

  • 95.

  • 96.

  • 97.

  • 98.

线程池配置类

复制

@Data
public class ThreadPoolProperties {/**     * 线程池名称     */private String threadPoolBeanName;/**     * 线程池核心线程数量     */private int corePoolSize;/**     * 线程池最大线程池数量     */private int maxPoolSize;}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

配置value:

复制

[
  {"threadPoolBeanName": "submitOrderThreadPoolTaskExecutor","corePoolSize": 32,"maxPoolSize": 128
  }]
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

(4) 应用启动刷新应用本地动态线程池配置

复制

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {@Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DynamicThreadPoolTaskExecutor) {DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);}return bean;}}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

3.动态线程池应用

动态线程池Bean声明

复制

<!-- 普通线程池 --><bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper"><!-- 核心线程数,默认为 --><property name="corePoolSize" value="128"/><!-- 最大线程数,默认为Integer.MAX_VALUE --><property name="maxPoolSize" value="512"/><!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --><property name="queueCapacity" value="500"/><!-- 线程池维护线程所允许的空闲时间,默认为60s --><property name="keepAliveSeconds" value="60"/><!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 --><property name="rejectedExecutionHandler"><!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --><!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 --><!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 --><!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 --><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/></property></bean><!-- 动态线程池 --><bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor"><!-- 核心线程数,默认为 --><property name="corePoolSize" value="32"/><!-- 最大线程数,默认为Integer.MAX_VALUE --><property name="maxPoolSize" value="128"/><!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --><property name="queueCapacity" value="500"/><!-- 线程池维护线程所允许的空闲时间,默认为60s --><property name="keepAliveSeconds" value="60"/><!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 --><property name="rejectedExecutionHandler"><!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --><!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 --><!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 --><!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 --><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/></property></bean><!-- 动态线程池刷新配置 --><bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/><bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

业务类注入Spring Bean后,直接使用即可

复制

@Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;

 
 Runnable asyncTask = ()->{...};
 CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。

34    2023-03-08 07:43:07    DUCC 配置 平台