日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
超詳細(xì)的分布式調(diào)度框架Elastic-job實(shí)踐詳解

超詳細(xì)的分布式調(diào)度框架 Elastic-job 實(shí)踐詳解

作者: 鴨血粉絲 2021-01-28 07:32:14

開發(fā)

架構(gòu)

分布式 如果需要運(yùn)行的定時任務(wù)很少的話,使用 Quartz 不會有太大的問題,但是如果 現(xiàn)在有這么一個需求,例如理財(cái)產(chǎn)品,每天6點(diǎn)系統(tǒng)需要計(jì)算每個賬戶昨天的收益,假如這個理財(cái)產(chǎn)品,有幾個億的用戶,如果都在一個服務(wù)實(shí)例上跑,可能第二天都無法處理完這項(xiàng)任務(wù)!

為定結(jié)等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及定結(jié)網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、外貿(mào)網(wǎng)站建設(shè)、定結(jié)網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!

本文轉(zhuǎn)載自微信公眾號「Java極客技術(shù)」,作者鴨血粉絲 。轉(zhuǎn)載本文請聯(lián)系Java極客技術(shù)公眾號。  

一、介紹

在前幾篇文章中,我們詳細(xì)的介紹了 Quartz 的架構(gòu)原理以及應(yīng)用實(shí)踐,雖然 Quartz 也可以通過集群方式來保證服務(wù)高可用,但是它也有一個的弊端,那就是服務(wù)節(jié)點(diǎn)數(shù)量的增加,并不能提升任務(wù)的執(zhí)行效率,即不能實(shí)現(xiàn)水平擴(kuò)展!

之所以產(chǎn)生這樣的結(jié)果,是因?yàn)?Quartz 在分布式集群環(huán)境下是通過數(shù)據(jù)庫鎖方式來實(shí)現(xiàn)有且只有一個有效的服務(wù)節(jié)點(diǎn)來運(yùn)行服務(wù),從而保證服務(wù)在集群環(huán)境下定時任務(wù)不會被重復(fù)調(diào)用!

如果需要運(yùn)行的定時任務(wù)很少的話,使用 Quartz 不會有太大的問題,但是如果 現(xiàn)在有這么一個需求,例如理財(cái)產(chǎn)品,每天6點(diǎn)系統(tǒng)需要計(jì)算每個賬戶昨天的收益,假如這個理財(cái)產(chǎn)品,有幾個億的用戶,如果都在一個服務(wù)實(shí)例上跑,可能第二天都無法處理完這項(xiàng)任務(wù)!

類似這樣場景還有很多很多,很顯然 Quartz 很難滿足我們這種大批量、任務(wù)執(zhí)行周期長的任務(wù)調(diào)度!

因此短板,當(dāng)當(dāng)網(wǎng)基于 Quartz 開發(fā)了一套適合在分布式環(huán)境下能高效率的使用服務(wù)器資源的 Elastic-Job 定時任務(wù)框架!

Elastic-Job-Lite最大的亮點(diǎn)就是支持彈性擴(kuò)容縮容,怎么實(shí)現(xiàn)的呢?

比如現(xiàn)在有個任務(wù)要執(zhí)行,如果將任務(wù)進(jìn)行分片成10個,那么可以同時在10個服務(wù)實(shí)例上并行執(zhí)行,互相不影響,從而大大的提升了任務(wù)執(zhí)行效率,并且充分的利用服務(wù)器資源!

對于上面的理財(cái)產(chǎn)品,如果這個任務(wù)需要處理1個億用戶,那么我們可以通過水平擴(kuò)展,比如對任務(wù)進(jìn)行分片為500,讓500個服務(wù)實(shí)例同時運(yùn)行,每個服務(wù)實(shí)例處理20萬條數(shù)據(jù),不出意外的話,1 - 2個小時可以全部跑完,如果時間還是很長,還可以繼續(xù)水平擴(kuò)張,添加服務(wù)實(shí)例來運(yùn)行!

2015 年,當(dāng)當(dāng)網(wǎng)將其開源,瞬間吸引了一大批程序員的關(guān)注,同時登頂開源中國第一名!

下面我們就一起來了解一下這款使用非常廣泛的分布式調(diào)度框架。

二、項(xiàng)目架構(gòu)介紹

Elastic-Job 最開始只有一個 elastic-job-core 的項(xiàng)目,定位輕量級、無中心化,最核心的服務(wù)就是支持彈性擴(kuò)容和數(shù)據(jù)分片!

從 2.X 版本以后,主要分為 Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個子項(xiàng)目。

其中,Elastic-Job-Lite 定位為輕量級 無 中 心 化 解 決 方 案 , 使 用jar 包 的 形 式 提 供 分 布 式 任 務(wù) 的 協(xié) 調(diào) 服 務(wù) 。

而 Elastic-Job-Cloud 使用 Mesos + Docker 的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進(jìn)程隔離等服務(wù)(跟 Lite 的區(qū)別只是部署方式不同,他們使用相同的 API,只要開發(fā)一次)。

今天我們主要介紹的是Elastic-Job-Lite,最主要的功能特性如下:

  • 分布式調(diào)度協(xié)調(diào):采用 zookeeper 實(shí)現(xiàn)注冊中心,進(jìn)行統(tǒng)一調(diào)度。
  • 支持任務(wù)分片:將需要執(zhí)行的任務(wù)進(jìn)行分片,實(shí)現(xiàn)并行調(diào)度。
  • 支持彈性擴(kuò)容縮容:將任務(wù)拆分為 n 個任務(wù)項(xiàng)后,各個服務(wù)器分別執(zhí)行各自分配到的任務(wù)項(xiàng)。一旦有新的服務(wù)器加入集群,或現(xiàn)有服務(wù)器下線,elastic-job 將在保留本次任務(wù)執(zhí)行不變的情況下,下次任務(wù)開始前觸發(fā)任務(wù)重分片。

當(dāng)然,還有失效轉(zhuǎn)移、錯過執(zhí)行作業(yè)重觸發(fā)等等功能,大家可以訪問官網(wǎng)文檔,以獲取更多詳細(xì)資料。

應(yīng)用在各自的節(jié)點(diǎn)執(zhí)行任務(wù),通過 zookeeper 注冊中心協(xié)調(diào)。節(jié)點(diǎn)注冊、節(jié)點(diǎn)選舉、任務(wù)分片、監(jiān)聽都在 E-Job 的代碼中完成。下圖是官網(wǎng)提供得架構(gòu)圖。

 

啥也不用多說了,下面我們直接通過實(shí)踐介紹,更容易了解里面是怎么玩的!

三、應(yīng)用實(shí)踐

3.1、zookeeper 安裝

elastic-job-lite,是直接依賴 zookeeper 的,因此在開發(fā)之前我們需要先準(zhǔn)備好對應(yīng)的 zookeeper 環(huán)境,關(guān)于 zookeeper 的安裝過程,就不多說了,非常簡單,網(wǎng)上都有教程!

3.2、elastic-job-lite-console 安裝

elastic-job-lite-console,主要是一個任務(wù)作業(yè)可視化界面管理系統(tǒng)。

可以單獨(dú)部署,與平臺不關(guān),主要是通過配置注冊中心和數(shù)據(jù)源來抓取數(shù)據(jù)。

獲取的方式也很簡單,直接訪問https://github.com/apache/shardingsphere-elasticjob地址,然后切換到2.1.5的版本號,然后執(zhí)行mvn clean install進(jìn)行打包,獲取對應(yīng)的安裝包將其解壓,進(jìn)行bin文件夾啟動服務(wù)即可!

 

如果你的網(wǎng)速像蝸牛一樣的慢,還有一個辦法就是從這個地址https://gitee.com/elasticjob/elastic-job獲取對應(yīng)的源碼!

啟動服務(wù)后,在瀏覽器訪問http://127.0.0.1:8899,輸入賬戶、密碼(都是root)即可進(jìn)入控制臺頁面,類似如下界面!

 

進(jìn)入之后,將上文所在的 zookeeper 注冊中心進(jìn)行配置,包括數(shù)據(jù)庫 mysql 的數(shù)據(jù)源也可以配置一下!

3.3、創(chuàng)建工程

本文采用springboot來搭建工程為例,創(chuàng)建工程并添加elastic-job-lite依賴!

  
 
 
 
  1.  
  2.  
  3.     com.dangdang 
  4.     elastic-job-lite-core 
  5.     2.1.5 
  6.  
  7.  
  8.  
  9.  
  10.     com.dangdang 
  11.     elastic-job-lite-spring 
  12.     2.1.5 
  13.  

在配置文件application.properties中提前配置好 zookeeper 注冊中心相關(guān)信息!

  
 
 
 
  1. #zookeeper config 
  2. zookeeper.serverList=127.0.0.1:2181 
  3. zookeeper.namespace=example-elastic-job-test 

3.4、新建 ZookeeperConfig 配置類

  
 
 
 
  1. @Configuration 
  2. @ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0") 
  3. public class ZookeeperConfig { 
  4.  
  5.     /** 
  6.      * zookeeper 配置 
  7.      * @return 
  8.      */ 
  9.     @Bean(initMethod = "init") 
  10.     public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${zookeeper.serverList}") String serverList,  
  11.                                                            @Value("${zookeeper.namespace}") String namespace){ 
  12.         return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace)); 
  13.     } 
  14.  

3.5、新建任務(wù)處理類

elastic-job支持三種類型的作業(yè)任務(wù)處理!

  • Simple 類型作業(yè):Simple 類型用于一般任務(wù)的處理,只需實(shí)現(xiàn)SimpleJob接口。該接口僅提供單一方法用于覆蓋,此方法將定時執(zhí)行,與Quartz原生接口相似。
  • Dataflow 類型作業(yè):Dataflow 類型用于處理數(shù)據(jù)流,需實(shí)現(xiàn)DataflowJob接口。該接口提供2個方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù)。
  • Script類型作業(yè):Script 類型作業(yè)意為腳本類型作業(yè),支持 shell,python,perl等所有類型腳本。只需通過控制臺或代碼配置 scriptCommandLine 即可,無需編碼。執(zhí)行腳本路徑可包含參數(shù),參數(shù)傳遞完畢后,作業(yè)框架會自動追加最后一個參數(shù)為作業(yè)運(yùn)行時信息。

3.6、新建 Simple 類型作業(yè)

編寫一個SimpleJob接口的實(shí)現(xiàn)類MySimpleJob,當(dāng)前工作主要是打印一條日志。

  
 
 
 
  1. @Slf4j 
  2. public class MySimpleJob implements SimpleJob { 
  3.  
  4.     @Override 
  5.     public void execute(ShardingContext shardingContext) { 
  6.         log.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " + 
  7.                         "當(dāng)前分片項(xiàng): %s.當(dāng)前參數(shù): %s," + 
  8.                         "作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s" 
  9.                 , 
  10.                 Thread.currentThread().getId(), 
  11.                 shardingContext.getShardingTotalCount(), 
  12.                 shardingContext.getShardingItem(), 
  13.                 shardingContext.getShardingParameter(), 
  14.                 shardingContext.getJobName(), 
  15.                 shardingContext.getJobParameter() 
  16.         )); 
  17.     } 

創(chuàng)建一個MyElasticJobListener任務(wù)監(jiān)聽器,用于監(jiān)聽MySimpleJob的任務(wù)執(zhí)行情況。

  
 
 
 
  1. @Slf4j 
  2. public class MyElasticJobListener implements ElasticJobListener { 
  3.  
  4.     private long beginTime = 0; 
  5.  
  6.     @Override 
  7.     public void beforeJobExecuted(ShardingContexts shardingContexts) { 
  8.         beginTime = System.currentTimeMillis(); 
  9.         log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(),  DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); 
  10.     } 
  11.  
  12.     @Override 
  13.     public void afterJobExecuted(ShardingContexts shardingContexts) { 
  14.         long endTime = System.currentTimeMillis(); 
  15.         log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime); 
  16.     } 
  17.  

創(chuàng)建一個MySimpleJobConfig類,將MySimpleJob其注入到zookeeper。

  
 
 
 
  1. @Configuration 
  2. public class MySimpleJobConfig { 
  3.  
  4.     /** 
  5.      * 任務(wù)名稱 
  6.      */ 
  7.     @Value("${simpleJob.mySimpleJob.name}") 
  8.     private String mySimpleJobName; 
  9.  
  10.     /** 
  11.      * cron表達(dá)式 
  12.      */ 
  13.     @Value("${simpleJob.mySimpleJob.cron}") 
  14.     private String mySimpleJobCron; 
  15.  
  16.     /** 
  17.      * 作業(yè)分片總數(shù) 
  18.      */ 
  19.     @Value("${simpleJob.mySimpleJob.shardingTotalCount}") 
  20.     private int mySimpleJobShardingTotalCount; 
  21.  
  22.     /** 
  23.      * 作業(yè)分片參數(shù) 
  24.      */ 
  25.     @Value("${simpleJob.mySimpleJob.shardingItemParameters}") 
  26.     private String mySimpleJobShardingItemParameters; 
  27.  
  28.     /** 
  29.      * 自定義參數(shù) 
  30.      */ 
  31.     @Value("${simpleJob.mySimpleJob.jobParameters}") 
  32.     private String mySimpleJobParameters; 
  33.  
  34.     @Autowired 
  35.     private ZookeeperRegistryCenter registryCenter; 
  36.  
  37.     @Bean 
  38.     public MySimpleJob mySimpleJob() { 
  39.         return new MySimpleJob(); 
  40.     } 
  41.  
  42.     @Bean(initMethod = "init") 
  43.     public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { 
  44.   //配置任務(wù)監(jiān)聽器 
  45.    MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  46.         return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); 
  47.     } 
  48.  
  49.     private LiteJobConfiguration getLiteJobConfiguration() { 
  50.         // 定義作業(yè)核心配置 
  51.         JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). 
  52.                 shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); 
  53.         // 定義SIMPLE類型配置 
  54.         SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); 
  55.         // 定義Lite作業(yè)根配置 
  56.         LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); 
  57.         return simpleJobRootConfig; 
  58.  
  59.     } 

在配置文件application.properties中配置好對應(yīng)的mySimpleJob參數(shù)!

  
 
 
 
  1. #elastic job 
  2. #simpleJob類型的job 
  3. simpleJob.mySimpleJob.name=mySimpleJob 
  4. simpleJob.mySimpleJob.cron=0/15 * * * * ? 
  5. simpleJob.mySimpleJob.shardingTotalCount=3 
  6. simpleJob.mySimpleJob.shardingItemParameters=0=a,1=b,2=c 
  7. simpleJob.mySimpleJob.jobParameters=helloWorld 

運(yùn)行程序,看看效果如何?

 

在上圖demo中,配置的分片數(shù)為3,這個時候會有3個線程進(jìn)行同時執(zhí)行任務(wù),因?yàn)槎际窃谝慌_機(jī)器上執(zhí)行的,這個任務(wù)被執(zhí)行來3次,下面修改一下端口配置,創(chuàng)建三個相同的服務(wù)實(shí)例,在看看效果如下:

 

很清晰的看到任務(wù)被執(zhí)行一次!

3.7、新建 DataFlowJob 類型作業(yè)

DataFlowJob 類型的任務(wù)配置和SimpleJob類似,操作也很簡單!

創(chuàng)建一個DataflowJob類型的實(shí)現(xiàn)類MyDataFlowJob。

  
 
 
 
  1. @Slf4j 
  2. public class MyDataFlowJob implements DataflowJob { 
  3.  
  4.     private boolean flag = false; 
  5.  
  6.     @Override 
  7.     public List fetchData(ShardingContext shardingContext) { 
  8.         log.info("開始獲取數(shù)據(jù)"); 
  9.         if (flag) { 
  10.             return null; 
  11.         } 
  12.         return Arrays.asList("qingshan", "jack", "seven"); 
  13.     } 
  14.  
  15.     @Override 
  16.     public void processData(ShardingContext shardingContext, List data) { 
  17.         for (String val : data) { 
  18.             // 處理完數(shù)據(jù)要移除掉,不然就會一直跑,處理可以在上面的方法里執(zhí)行。這里采用 flag 
  19.             log.info("開始處理數(shù)據(jù):" + val); 
  20.         } 
  21.         flag = true; 
  22.     } 

接著創(chuàng)建MyDataFlowJob的配置類,將其注入到zookeeper注冊中心。

  
 
 
 
  1. Configuration 
  2. public class MyDataFlowJobConfig { 
  3.  
  4.     /** 
  5.      * 任務(wù)名稱 
  6.      */ 
  7.     @Value("${dataflowJob.myDataflowJob.name}") 
  8.     private String jobName; 
  9.  
  10.     /** 
  11.      * cron表達(dá)式 
  12.      */ 
  13.     @Value("${dataflowJob.myDataflowJob.cron}") 
  14.     private String jobCron; 
  15.  
  16.     /** 
  17.      * 作業(yè)分片總數(shù) 
  18.      */ 
  19.     @Value("${dataflowJob.myDataflowJob.shardingTotalCount}") 
  20.     private int jobShardingTotalCount; 
  21.  
  22.     /** 
  23.      * 作業(yè)分片參數(shù) 
  24.      */ 
  25.     @Value("${dataflowJob.myDataflowJob.shardingItemParameters}") 
  26.     private String jobShardingItemParameters; 
  27.  
  28.     /** 
  29.      * 自定義參數(shù) 
  30.      */ 
  31.     @Value("${dataflowJob.myDataflowJob.jobParameters}") 
  32.     private String jobParameters; 
  33.  
  34.     @Autowired 
  35.     private ZookeeperRegistryCenter registryCenter; 
  36.  
  37.  
  38.     @Bean 
  39.     public MyDataFlowJob myDataFlowJob() { 
  40.         return new MyDataFlowJob(); 
  41.     } 
  42.  
  43.     @Bean(initMethod = "init") 
  44.     public JobScheduler dataFlowJobScheduler(final MyDataFlowJob myDataFlowJob) { 
  45.         MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  46.         return new SpringJobScheduler(myDataFlowJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); 
  47.     } 
  48.  
  49.     private LiteJobConfiguration getLiteJobConfiguration() { 
  50.         // 定義作業(yè)核心配置 
  51.         JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). 
  52.                 shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); 
  53.         // 定義DATAFLOW類型配置 
  54.         DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), false); 
  55.         // 定義Lite作業(yè)根配置 
  56.         LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).overwrite(true).build(); 
  57.         return dataflowJobRootConfig; 
  58.  
  59.     } 

最后,在配置文件application.properties中配置好對應(yīng)的myDataflowJob參數(shù)!

  
 
 
 
  1. #dataflow類型的job 
  2. dataflowJob.myDataflowJob.name=myDataflowJob 
  3. dataflowJob.myDataflowJob.cron=0/15 * * * * ? 
  4. dataflowJob.myDataflowJob.shardingTotalCount=1 
  5. dataflowJob.myDataflowJob.shardingItemParameters=0=a,1=b,2=c 
  6. dataflowJob.myDataflowJob.jobParameters=myDataflowJobParamter 

運(yùn)行程序,看看效果如何?

 

需要注意的地方是,如果配置的是流式處理類型,它會不停的拉取數(shù)據(jù)、處理數(shù)據(jù),在拉取的時候,如果返回為空,就不會處理數(shù)據(jù)!

如果配置的是非流式處理類型,和上面介紹的simpleJob類型,處理一樣!

3.8、新建 ScriptJob 類型作業(yè)

ScriptJob 類型的任務(wù)配置和上面類似,主要是用于定時執(zhí)行某個腳本,一般用的比較少!

因?yàn)槟繕?biāo)是腳本,沒有執(zhí)行的任務(wù),所以無需編寫任務(wù)作業(yè)類型!

只需要編寫一個ScriptJob類型的配置類即可,命令是echo 'Hello World !內(nèi)容!

  
 
 
 
  1. @Configuration 
  2. public class MyScriptJobConfig { 
  3.  
  4.     /** 
  5.      * 任務(wù)名稱 
  6.      */ 
  7.     @Value("${scriptJob.myScriptJob.name}") 
  8.     private String jobName; 
  9.  
  10.     /** 
  11.      * cron表達(dá)式 
  12.      */ 
  13.     @Value("${scriptJob.myScriptJob.cron}") 
  14.     private String jobCron; 
  15.  
  16.     /** 
  17.      * 作業(yè)分片總數(shù) 
  18.      */ 
  19.     @Value("${scriptJob.myScriptJob.shardingTotalCount}") 
  20.     private int jobShardingTotalCount; 
  21.  
  22.     /** 
  23.      * 作業(yè)分片參數(shù) 
  24.      */ 
  25.     @Value("${scriptJob.myScriptJob.shardingItemParameters}") 
  26.     private String jobShardingItemParameters; 
  27.  
  28.     /** 
  29.      * 自定義參數(shù) 
  30.      */ 
  31.     @Value("${scriptJob.myScriptJob.jobParameters}") 
  32.     private String jobParameters; 
  33.  
  34.     @Autowired 
  35.     private ZookeeperRegistryCenter registryCenter; 
  36.  
  37.  
  38.     @Bean(initMethod = "init") 
  39.     public JobScheduler scriptJobScheduler() { 
  40.         MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  41.         return new JobScheduler(registryCenter, getLiteJobConfiguration(), elasticJobListener); 
  42.     } 
  43.  
  44.     private LiteJobConfiguration getLiteJobConfiguration() { 
  45.         // 定義作業(yè)核心配置 
  46.         JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). 
  47.                 shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); 
  48.         // 定義SCRIPT類型配置 
  49.         ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "echo 'Hello World !'"); 
  50.         // 定義Lite作業(yè)根配置 
  51.         LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).overwrite(true).build(); 
  52.         return scriptJobRootConfig; 
  53.  
  54.     } 

在配置文件application.properties中配置好對應(yīng)的myScriptJob參數(shù)!

  
 
 
 
  1. #script類型的job 
  2. scriptJob.myScriptJob.name=myScriptJob 
  3. scriptJob.myScriptJob.cron=0/15 * * * * ? 
  4. scriptJob.myScriptJob.shardingTotalCount=3 
  5. scriptJob.myScriptJob.shardingItemParameters=0=a,1=b,2=c 
  6. scriptJob.myScriptJob.jobParameters=myScriptJobParamter 

運(yùn)行程序,看看效果如何?

 

3.9、將任務(wù)狀態(tài)持久化到數(shù)據(jù)庫

可能有的人會發(fā)出疑問,elastic-job是如何存儲數(shù)據(jù)的,用ZooInspector客戶端鏈接zookeeper注冊中心,你發(fā)現(xiàn)對應(yīng)的任務(wù)配置被存儲到相應(yīng)的樹根上!

 

而具體作業(yè)任務(wù)執(zhí)行軌跡和狀態(tài)結(jié)果是不會存儲到zookeeper,需要我們在項(xiàng)目中通過數(shù)據(jù)源方式進(jìn)行持久化!

將任務(wù)狀態(tài)持久化到數(shù)據(jù)庫配置過程也很簡單,只需要在對應(yīng)的配置類上注入數(shù)據(jù)源即可,以MySimpleJobConfig為例,代碼如下:

  
 
 
 
  1. @Configuration 
  2. public class MySimpleJobConfig { 
  3.  
  4.     /** 
  5.      * 任務(wù)名稱 
  6.      */ 
  7.     @Value("${simpleJob.mySimpleJob.name}") 
  8.     private String mySimpleJobName; 
  9.  
  10.     /** 
  11.      * cron表達(dá)式 
  12.      */ 
  13.     @Value("${simpleJob.mySimpleJob.cron}") 
  14.     private String mySimpleJobCron; 
  15.  
  16.     /** 
  17.      * 作業(yè)分片總數(shù) 
  18.      */ 
  19.     @Value("${simpleJob.mySimpleJob.shardingTotalCount}") 
  20.     private int mySimpleJobShardingTotalCount; 
  21.  
  22.     /** 
  23.      * 作業(yè)分片參數(shù) 
  24.      */ 
  25.     @Value("${simpleJob.mySimpleJob.shardingItemParameters}") 
  26.     private String mySimpleJobShardingItemParameters; 
  27.  
  28.     /** 
  29.      * 自定義參數(shù) 
  30.      */ 
  31.     @Value("${simpleJob.mySimpleJob.jobParameters}") 
  32.     private String mySimpleJobParameters; 
  33.  
  34.     @Autowired 
  35.     private ZookeeperRegistryCenter registryCenter; 
  36.  
  37.     @Autowired 
  38.     private DataSource dataSource;; 
  39.  
  40.  
  41.     @Bean 
  42.     public MySimpleJob stockJob() { 
  43.         return new MySimpleJob(); 
  44.     } 
  45.  
  46.     @Bean(initMethod = "init") 
  47.     public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { 
  48.         //添加事件數(shù)據(jù)源配置 
  49.         JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); 
  50.         MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  51.         return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), jobEventConfig, elasticJobListener); 
  52.     } 
  53.  
  54.     private LiteJobConfiguration getLiteJobConfiguration() { 
  55.         // 定義作業(yè)核心配置 
  56.         JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). 
  57.                 shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); 
  58.         // 定義SIMPLE類型配置 
  59.         SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); 
  60.         // 定義Lite作業(yè)根配置 
  61.         LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); 
  62.         return simpleJobRootConfig; 
  63.  
  64.     } 

同時,需要在配置文件application.properties中配置好對應(yīng)的datasource參數(shù)!

  
 
 
 
  1. spring.datasource.url=jdbc:mysql://127.0.0.1:3306/example-elastic-job-test 
  2. spring.datasource.username=root 
  3. spring.datasource.password=root 
  4. spring.datasource.driver-class-name=com.mysql.jdbc.Driver 

運(yùn)行程序,然后在elastic-job-lite-console控制臺配置對應(yīng)的數(shù)據(jù)源!

 

最后,點(diǎn)擊【作業(yè)軌跡】即可查看對應(yīng)作業(yè)執(zhí)行情況!

 

四、小結(jié)

本文主要圍繞elasticjob的使用進(jìn)行簡單介紹,希望大家有所收獲!

在分布式環(huán)境環(huán)境下,elastic-job-lite支持的彈性擴(kuò)容、任務(wù)分片是最大的亮點(diǎn),在實(shí)際使用的時候,任務(wù)分片總數(shù)盡可能大于服務(wù)實(shí)例個數(shù),并且是倍數(shù)關(guān)系,這樣任務(wù)在分片的時候,會更加均勻!

如果想深入的了解elasticjob,大家可以訪問官方文檔,獲取更加詳細(xì)的使用教程!

五、參考1、elasticjob - 官方文檔

2、博客園 - 吳振照 - 任務(wù)調(diào)度之 Elastic Job


本文標(biāo)題:超詳細(xì)的分布式調(diào)度框架Elastic-job實(shí)踐詳解
URL分享:http://www.5511xx.com/article/dphiijs.html