日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
利用DUCC配置平臺(tái)實(shí)現(xiàn)一個(gè)動(dòng)態(tài)化線程池

?作者:京東零售 張賓

1.背景

在后臺(tái)開發(fā)中,會(huì)經(jīng)常用到線程池技術(shù),對(duì)于線程池核心參數(shù)的配置很大程度上依靠經(jīng)驗(yàn)。然而,由于系統(tǒng)運(yùn)行過程中存在的不確定性,我們很難一勞永逸地規(guī)劃一個(gè)合理的線程池參數(shù)。在對(duì)線程池配置參數(shù)進(jìn)行調(diào)整時(shí),一般需要對(duì)服務(wù)進(jìn)行重啟,這樣修改的成本就會(huì)偏高。一種解決辦法就是,將線程池的配置放到配置平臺(tái)側(cè),系統(tǒng)運(yùn)行期間開發(fā)人員根據(jù)系統(tǒng)運(yùn)行情況對(duì)核心參數(shù)進(jìn)行動(dòng)態(tài)配置。

本文以公司DUCC配置平臺(tái)作為服務(wù)配置中心,以修改線程池核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)化線程池。

2.代碼實(shí)現(xiàn)

當(dāng)前項(xiàng)目中使用的是Spring 框架提供的線程池類ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底層又使用里了JDK中線程池類ThreadPoolExecutor,線程池類ThreadPoolExecutor有兩個(gè)成員方法setCorePoolSize、setMaximumPoolSize可以在運(yùn)行時(shí)設(shè)置核心線程數(shù)和最大線程數(shù)。

setCorePoolSize方法執(zhí)行流程是:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的corePoolSize,然后,如果新的值比原始值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀,如果新的值比原來的值要大且工作隊(duì)列不為空,則會(huì)創(chuàng)建新的工作線程。流程圖如下:

setMaximumPoolSize方法:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的maximumPoolSize,然后,如果新的值比原來的值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀。

Spring 框架提供的線程池類ThreadPoolTaskExecutor,此類封裝了對(duì)ThreadPoolExecutor有兩個(gè)成員方法setCorePoolSize、setMaximumPoolSize的調(diào)用。

基于以上源代碼分析,要實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)線程池需要以下幾步:

(1)定義一個(gè)動(dòng)態(tài)線程池類,繼承ThreadPoolTaskExecutor,目的跟非動(dòng)態(tài)配置的線程池類ThreadPoolTaskExecutor區(qū)分開;

(2)定義和實(shí)現(xiàn)一個(gè)動(dòng)態(tài)線程池配置定時(shí)刷的類,目的定時(shí)對(duì)比ducc配置的線程池?cái)?shù)和本地應(yīng)用中線程數(shù)是否一致,若不一致,則更新本地動(dòng)態(tài)線程池線程池?cái)?shù);

(3)引入公司ducc配置平臺(tái)相關(guān)jar包并創(chuàng)建一個(gè)動(dòng)態(tài)線程池配置key;

(4)定義和實(shí)現(xiàn)一個(gè)應(yīng)用啟動(dòng)后根據(jù)動(dòng)態(tài)線程池Bean和從ducc配置平臺(tái)拉取配置刷新應(yīng)用中的線程數(shù)配置;

接下來代碼一一實(shí)現(xiàn):

(1)動(dòng)態(tài)線程池類

/**
* 動(dòng)態(tài)線程池
*
*/
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)動(dòng)態(tài)線程池配置定時(shí)刷新類

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
/**
* Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
*/
private static final ConcurrentMap DTP_REGISTRY = new ConcurrentHashMap<>();

/**
* @param threadPoolBeanName
* @param threadPoolTaskExecutor
*/
public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
}

@Override
public void afterPropertiesSet() throws Exception {
this.refresh();
//創(chuàng)建定時(shí)任務(wù)線程池
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
//延遲1秒執(zhí)行,每個(gè)1分鐘check一次
executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
}

private void refresh() {
String dynamicThreadPool = "";
try {
if (DTP_REGISTRY.isEmpty()) {
log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
return;
}
dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
if (StringUtils.isBlank(dynamicThreadPool)) {
log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
return;
}
log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
List threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference>() {
});
if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
return;
}
for (ThreadPoolProperties properties : threadPoolPropertiesList) {
doRefresh(properties);
}
} catch (Exception e) {
log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
}
}

/**
* @param properties
*/
private void doRefresh(ThreadPoolProperties properties) {
if (StringUtils.isBlank(properties.getThreadPoolBeanName())
|| properties.getCorePoolSize() < 1
|| properties.getMaxPoolSize() < 1
|| properties.getMaxPoolSize() < properties.getCorePoolSize()) {
log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
return;
}
DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
if (Objects.isNull(threadPoolTaskExecutor)) {
log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
return;
}
ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
&& Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
return;
}
if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
}
if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
}

ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
}

private class RefreshThreadPoolConfig extends TimerTask {
private RefreshThreadPoolConfig() {
}

@Override
public void run() {
DynamicThreadPoolRefresh.this.refresh();
}
}

}

線程池配置類

@Data
public class ThreadPoolProperties {
/**
* 線程池名稱
*/
private String threadPoolBeanName;
/**
* 線程池核心線程數(shù)量
*/
private int corePoolSize;
/**
* 線程池最大線程池?cái)?shù)量
*/
private int maxPoolSize;
}

(3)引入公司ducc配置平臺(tái)相關(guān)jar包并創(chuàng)建一個(gè)動(dòng)態(tài)線程池配置key

配置value:

[
{
"threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
"corePoolSize": 32,
"maxPoolSize": 128
}
]

(4) 應(yīng)用啟動(dòng)刷新應(yīng)用本地動(dòng)態(tài)線程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolTaskExecutor) {
DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
}
return bean;
}
}

3.動(dòng)態(tài)線程池應(yīng)用

動(dòng)態(tài)線程池Bean聲明









































業(yè)務(wù)類注入Spring Bean后,直接使用即可

@Resource
private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;


Runnable asyncTask = ()->{...};
CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小結(jié)

本文從實(shí)際項(xiàng)目的業(yè)務(wù)痛點(diǎn)場(chǎng)景出發(fā),并基于公司已有的ducc配置平臺(tái)簡(jiǎn)單實(shí)現(xiàn)了線程池線程數(shù)量可配置。?


當(dāng)前名稱:利用DUCC配置平臺(tái)實(shí)現(xiàn)一個(gè)動(dòng)態(tài)化線程池
分享鏈接:http://www.5511xx.com/article/djoiisd.html