日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
徹底了解線程池的原理—40行從零開(kāi)始自己寫(xiě)線程池

前言

在我們的日常的編程當(dāng)中,并發(fā)是始終離不開(kāi)的主題,而在并發(fā)多線程當(dāng)中,線程池又是一個(gè)不可規(guī)避的問(wèn)題。多線程可以提高我們并發(fā)程序的效率,可以讓我們不去頻繁的申請(qǐng)和釋放線程,這是一個(gè)很大的花銷(xiāo),而在線程池當(dāng)中就不需要去頻繁的申請(qǐng)線程,他的主要原理是申請(qǐng)完線程之后并不中斷,而是不斷的去隊(duì)列當(dāng)中領(lǐng)取任務(wù),然后執(zhí)行,反復(fù)這樣的操作。在本篇文章當(dāng)中我們主要是介紹線程池的原理,因此我們會(huì)自己寫(xiě)一個(gè)非常非常簡(jiǎn)單的線程池,主要幫助大家理解線程池的核心原理?。?!

臨武網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)公司!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。成都創(chuàng)新互聯(lián)公司2013年至今到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專(zhuān)注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)公司。

線程池給我們提供的功能

我們首先來(lái)看一個(gè)使用線程池的例子:

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class Demo01 {   public static void main(String[] args) {    ExecutorService pool = Executors.newFixedThreadPool(5);    for (int i = 0; i < 100; i++) {      pool.execute(new Runnable() {        @Override        public void run() {          for (int i = 0; i < 100; i++) {            System.out.println(Thread.currentThread().getName() + " print " + i);          }        }      });    }  }}

在上面的例子當(dāng)中,我們使用Executors.newFixedThreadPool去生成來(lái)一個(gè)固定線程數(shù)目的線程池,在上面的代碼當(dāng)中我們是使用5個(gè)線程,然后通過(guò)execute方法不斷的去向線程池當(dāng)中提交任務(wù),大致流程如下圖所示:

線程池通過(guò)execute函數(shù)不斷的往線程池當(dāng)中的任務(wù)隊(duì)列加入任務(wù),而線程池當(dāng)中的線程會(huì)不斷的從任務(wù)隊(duì)列當(dāng)中取出任務(wù),然后進(jìn)行執(zhí)行,然后繼續(xù)取任務(wù),繼續(xù)執(zhí)行....,線程的執(zhí)行過(guò)程如下:

while (true) {  Runnable runnable = taskQueue.take(); // 從任務(wù)隊(duì)列當(dāng)中取出任務(wù)  runnable.run(); // 執(zhí)行任務(wù)}

根據(jù)上面所談到的內(nèi)容,現(xiàn)在我們的需求很清晰了,首先我們需要有一個(gè)隊(duì)列去存儲(chǔ)我們所需要的任務(wù),然后需要開(kāi)啟多個(gè)線程不斷的去任務(wù)隊(duì)列當(dāng)中取出任務(wù),然后進(jìn)行執(zhí)行,然后重復(fù)取任務(wù)執(zhí)行任務(wù)的操作。

工具介紹

在我們前面提到的線程池實(shí)現(xiàn)的原理當(dāng)中有一個(gè)非常重要的數(shù)據(jù)結(jié)構(gòu),就是ArrayBlockingQueue阻塞隊(duì)列,它是一個(gè)并發(fā)安全的數(shù)據(jù)結(jié)構(gòu),我們首先先簡(jiǎn)單介紹一下這個(gè)數(shù)據(jù)結(jié)構(gòu)的使用方法。(如果你想深入了解阻塞隊(duì)列的實(shí)現(xiàn)原理,可以參考這篇文章JDK數(shù)組阻塞隊(duì)列源碼剖析)

我們主要用的是ArrayBlockingQueue的下面兩個(gè)方法:

  • put函數(shù),這個(gè)函數(shù)是往線程當(dāng)中加入數(shù)據(jù)的。我們需要了解的是,如果一個(gè)線程調(diào)用了這個(gè)函數(shù)往隊(duì)列當(dāng)中加入數(shù)據(jù),如果此時(shí)隊(duì)列已經(jīng)滿了則線程需要被掛起,如果沒(méi)有滿則需要將數(shù)據(jù)加入到隊(duì)列當(dāng)中,也就是將數(shù)據(jù)存儲(chǔ)到數(shù)組當(dāng)中。
  • take函數(shù),從隊(duì)列當(dāng)中取出數(shù)據(jù),但是當(dāng)隊(duì)列為空的時(shí)候需要將調(diào)用這個(gè)方法的線程阻塞。當(dāng)隊(duì)列當(dāng)中有數(shù)據(jù)的時(shí)候,就可以從隊(duì)列當(dāng)中取出數(shù)據(jù)。
  • 需要注意的是,如果一個(gè)線程被上面兩個(gè)任何一個(gè)線程阻塞之后,可以調(diào)用對(duì)應(yīng)線程的interrupt方法終止線程的執(zhí)行,同時(shí)還會(huì)拋出一個(gè)異常。

下面是一份測(cè)試代碼:

import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.TimeUnit; public class QueueTest {   public static void main(String[] args) throws InterruptedException {    ArrayBlockingQueue queue = new ArrayBlockingQueue(5); // 隊(duì)列的容量為5    Thread thread = new Thread(() -> {      for (int i = 0; i < 10; i++) {        try {          queue.put(i);          System.out.println("數(shù)據(jù) " + i + "被加入到隊(duì)列當(dāng)中");        } catch (InterruptedException e) {          System.out.println("出現(xiàn)了中斷異常");          // 如果出現(xiàn)中斷異常 則退出 線程就不會(huì)一直在 put 方法被掛起了          return;        }finally {        }      }    });    thread.start();    TimeUnit.SECONDS.sleep(1);    thread.interrupt();  }}

上面代碼輸出結(jié)果:

數(shù)據(jù) 0被加入到隊(duì)列當(dāng)中數(shù)據(jù) 1被加入到隊(duì)列當(dāng)中數(shù)據(jù) 2被加入到隊(duì)列當(dāng)中數(shù)據(jù) 3被加入到隊(duì)列當(dāng)中數(shù)據(jù) 4被加入到隊(duì)列當(dāng)中出現(xiàn)了中斷異常

上面代碼的執(zhí)行順序是:

線程thread會(huì)將0-4這5個(gè)數(shù)據(jù)加入到隊(duì)列當(dāng)中,但是在加入第6個(gè)數(shù)據(jù)的時(shí)候,阻塞隊(duì)列已經(jīng)滿了,因此在加入數(shù)據(jù)的時(shí)候線程thread會(huì)被阻塞,然后主線程在休息一秒之后中斷了線程thread,然后線程thread發(fā)生了中斷異常,然后被捕獲進(jìn)入catch代碼塊,然后函數(shù)返回,線程thread就不會(huì)一直被阻塞了,這一點(diǎn)在我們后面寫(xiě)線程池非常重要!??!

Worker設(shè)計(jì)

在前文當(dāng)中我們已經(jīng)提到了我們的線程需要不斷的去任務(wù)隊(duì)列里面取出任務(wù)然后執(zhí)行,我們?cè)O(shè)計(jì)一個(gè)Worker類(lèi)去做這件事!

  • 首先在類(lèi)當(dāng)中肯定需要有一個(gè)線程池的任務(wù)隊(duì)列,因?yàn)閣orker需要不斷的從阻塞隊(duì)列當(dāng)中取出任務(wù)進(jìn)行執(zhí)行。
  • 我們用一個(gè)isStopped變量表示線程是否需要終止了,也就是線程池是否需要關(guān)閉,如果線程池需要關(guān)閉了,那么線程也應(yīng)該停止了。
  • 我們還需要有一個(gè)變量記錄執(zhí)行任務(wù)的線程,因?yàn)楫?dāng)我們需要關(guān)閉線程池的時(shí)候需要等待任務(wù)隊(duì)列當(dāng)中所有的任務(wù)執(zhí)行完成,那么當(dāng)所有的任務(wù)都執(zhí)行執(zhí)行完成的時(shí)候,隊(duì)列肯定是空的,而如果這個(gè)時(shí)候有線程還去取任務(wù),那么肯定會(huì)被阻塞,前面已經(jīng)提到了ArrayBlockingQueue的使用方法了,我們可以使用這個(gè)線程的interrupt的方法去中斷這個(gè)線程的執(zhí)行,這個(gè)線程會(huì)出現(xiàn)異常,然后這個(gè)線程捕獲這個(gè)異常就可以退出了,因此我們需要知道對(duì)那個(gè)線程執(zhí)行interrupt方法!

Worker實(shí)現(xiàn)的代碼如下:

import java.util.concurrent.ArrayBlockingQueue; public class Worker implements Runnable {   // 用于保存任務(wù)的隊(duì)列  private ArrayBlockingQueue tasks;  // 線程的狀態(tài) 是否終止  private volatile boolean isStopped;   // 保存執(zhí)行 run 方法的線程  private volatile Thread thisThread;   public Worker(ArrayBlockingQueue tasks) {    // 這個(gè)參數(shù)是線程池當(dāng)中傳入的    this.tasks = tasks;  }   @Override  public void run() {    thisThread = Thread.currentThread();    while (!isStopped) {      try {        Runnable task = tasks.take();        task.run();      } catch (InterruptedException e) {        // do nothing      }    }  }  // 注意是其他線程調(diào)用這個(gè)方法 同時(shí)需要注意是 thisThread 這個(gè)線程在執(zhí)行上面的 run 方法  // 其他線程調(diào)用 thisThread 的 interrupt 方法之后 thisThread 會(huì)出現(xiàn)異常 然后就不會(huì)一直阻塞了  // 會(huì)判斷 isStopped 是否為 true 如果為 true 的話就可以退出 while 循環(huán)了  public void stop() {    isStopped = true;    thisThread.interrupt(); // 中斷線程 thisThread  }   public boolean isStopped(){    return isStopped;  }}

線程池設(shè)計(jì)

  • 首先線程池需要可以指定有多少個(gè)線程,阻塞隊(duì)列的最大長(zhǎng)度,因此我們需要有這兩個(gè)參數(shù)。
  • 線程池肯定需要有一個(gè)隊(duì)列去存放通過(guò)submit函數(shù)提交的任務(wù)。
  • 需要有一個(gè)變量存儲(chǔ)所有的woker,因?yàn)榫€程池關(guān)閉的時(shí)候需要將這些worker都停下來(lái),也就是調(diào)用worker的stop方法。
  • 需要有一個(gè)shutDown函數(shù)表示關(guān)閉線程池。
  • 需要有一個(gè)函數(shù)能夠停止所有線程的執(zhí)行,因?yàn)殛P(guān)閉線程池就是讓所有線程的工作停下來(lái)。

線程池實(shí)現(xiàn)代碼:

import java.util.ArrayList;import java.util.concurrent.ArrayBlockingQueue; public class MyFixedThreadPool {  // 用于存儲(chǔ)任務(wù)的阻塞隊(duì)列  private ArrayBlockingQueue taskQueue;   // 保存線程池當(dāng)中所有的線程  private ArrayList threadLists;   // 線程池是否關(guān)閉  private boolean isShutDown;   // 線程池當(dāng)中的線程數(shù)目  private int numThread;   public MyFixedThreadPool(int i) {    this(Runtime.getRuntime().availableProcessors() + 1, 1024);  }   public MyFixedThreadPool(int numThread, int maxTaskNumber) {    this.numThread = numThread;    taskQueue = new ArrayBlockingQueue<>(maxTaskNumber); // 創(chuàng)建阻塞隊(duì)列    threadLists = new ArrayList<>();    // 將所有的 worker 都保存下來(lái)    for (int i = 0; i < numThread; i++) {      Worker worker = new Worker(taskQueue);      threadLists.add(worker);    }    for (int i = 0; i < threadLists.size(); i++) {      new Thread(threadLists.get(i),              "ThreadPool-Thread-" + i).start(); // 讓worker開(kāi)始工作    }  }   // 停止所有的 worker 這個(gè)只在線程池要關(guān)閉的時(shí)候才會(huì)調(diào)用  private void stopAllThread() {    for (Worker worker : threadLists) {      worker.stop(); // 調(diào)用 worker 的 stop 方法 讓正在執(zhí)行 worker 當(dāng)中 run 方法的線程停止執(zhí)行    }  }   public void shutDown() {    // 等待任務(wù)隊(duì)列當(dāng)中的任務(wù)執(zhí)行完成    while (taskQueue.size() != 0) {      // 如果隊(duì)列當(dāng)中還有任務(wù) 則讓出 CPU 的使用權(quán)      Thread.yield();    }    // 在所有的任務(wù)都被執(zhí)行完成之后 停止所有線程的執(zhí)行    stopAllThread();  }   public void submit(Runnable runnable) {    try {      taskQueue.put(runnable); // 如果任務(wù)隊(duì)列滿了, 調(diào)用這個(gè)方法的線程會(huì)被阻塞    } catch (InterruptedException e) {      e.printStackTrace();    }  }}

測(cè)試代碼:

public class Test {   public static void main(String[] args) {    MyFixedThreadPool pool = new MyFixedThreadPool(5, 1024); // 開(kāi)啟5個(gè)線程 任務(wù)隊(duì)列當(dāng)中最多只能存1024個(gè)任務(wù)    for (int i = 0; i < 1000000; i++) {      pool.submit(() -> {        System.out.println(Thread.currentThread().getName()); // 提交的任務(wù)就是打印線程自己的名字      });    }    pool.shutDown();  }}

上面的代碼是可以正常執(zhí)行并且結(jié)束的,這個(gè)輸出太長(zhǎng)了這里只列出部分輸出結(jié)果:

ThreadPool-Thread-0ThreadPool-Thread-4ThreadPool-Thread-0ThreadPool-Thread-1ThreadPool-Thread-3ThreadPool-Thread-1ThreadPool-Thread-3ThreadPool-Thread-3ThreadPool-Thread-3ThreadPool-Thread-3ThreadPool-Thread-3ThreadPool-Thread-2ThreadPool-Thread-3ThreadPool-Thread-2ThreadPool-Thread-1ThreadPool-Thread-0ThreadPool-Thread-0ThreadPool-Thread-0ThreadPool-Thread-1ThreadPool-Thread-4

從上面的輸出我們可以看見(jiàn)線程池當(dāng)中只有5個(gè)線程,這5個(gè)線程在不斷從隊(duì)列當(dāng)中取出任務(wù)然后執(zhí)行,因?yàn)槲覀兛梢钥吹酵粋€(gè)線程的名字輸出了多次。

總結(jié)

在本篇文章當(dāng)中主要介紹了線程池的原理,以及我們應(yīng)該去如何設(shè)計(jì)一個(gè)線程池,同時(shí)也介紹了在阻塞隊(duì)列當(dāng)中一個(gè)非常重要的數(shù)據(jù)結(jié)構(gòu)ArrayBlockingQueue的使用方法。

線程池當(dāng)中有一個(gè)阻塞隊(duì)列去存放所有被提交到線程池當(dāng)中的任務(wù)。

所有的Worker會(huì)不斷的從任務(wù)隊(duì)列當(dāng)中取出任務(wù)然后執(zhí)行。

線程池的shutDown方法其實(shí)比較難思考該怎么實(shí)現(xiàn)的,首先在我們真正關(guān)閉線程池之前需要將任務(wù)隊(duì)列當(dāng)中所有的任務(wù)執(zhí)行完成,然后將所有的線程停下來(lái)。

在所有的任務(wù)執(zhí)行完成之后,可能有的線程就會(huì)阻塞在take方法上(從隊(duì)列當(dāng)中取數(shù)據(jù)的方法,如果隊(duì)列為空線程會(huì)阻塞),好在ArrayBlockingQueue在實(shí)現(xiàn)的時(shí)候就考慮到了這個(gè)問(wèn)題,只需要其他線程調(diào)用了這個(gè)被阻塞線程的interrupt方法的話,線程就可以通過(guò)捕獲異?;謴?fù)執(zhí)行,然后判斷isStopped,如果需要停止了就跳出while循環(huán),這樣的話我們就可以完成所有線程的停止操作了。


文章標(biāo)題:徹底了解線程池的原理—40行從零開(kāi)始自己寫(xiě)線程池
網(wǎng)頁(yè)地址:http://www.5511xx.com/article/dpjgcdi.html