日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
一個(gè)Demo學(xué)會(huì)WorkerPool

本文轉(zhuǎn)載自微信公眾號(hào)「Golang來啦」,作者Seekload。轉(zhuǎn)載本文請(qǐng)聯(lián)系Golang來啦公眾號(hào)。

成都創(chuàng)新互聯(lián)公司是工信部頒發(fā)資質(zhì)IDC服務(wù)器商,為用戶提供優(yōu)質(zhì)的服務(wù)器托管服務(wù)

四哥水平有限,如有翻譯或理解錯(cuò)誤,煩請(qǐng)幫忙指出,感謝!

今天給大家分享一篇關(guān)于 workPool 的文章,這個(gè)平時(shí)大家應(yīng)該用的比較多,一起來看下。

原文如下:

工作池是這樣一個(gè)池子,會(huì)創(chuàng)建指定數(shù)量的 worker,這些 worker 能獲取任務(wù)并處理。允許多個(gè)任務(wù)同時(shí)處理,但是需要維持固定數(shù)量的 worker 避免系統(tǒng)資源被過度使用。

通常有兩種方式創(chuàng)建任務(wù)池:

  • 一種是預(yù)先創(chuàng)建固定數(shù)量的 worker;
  • 另外一種是當(dāng)有需要的時(shí)候才會(huì)創(chuàng)建 worker,當(dāng)然也會(huì)有數(shù)量限制;

本文將與大家一起討論第一種方式。當(dāng)我們預(yù)先知道有許多任務(wù)需要同時(shí)運(yùn)行,并且很大概率會(huì)用上最大數(shù)量的 worker,通常會(huì)采用這種方式。

為了演示,我們先創(chuàng)建 Worker 結(jié)構(gòu)體,它獲取任務(wù)并執(zhí)行。

 
 
 
  1. import ( 
  2.  "fmt" 
  3.  
  4. // Worker ... 
  5. type Worker struct { 
  6.  ID       int 
  7.  Name     string 
  8.  StopChan chan bool 
  9.  
  10. // Start ... 
  11. func (w *Worker) Start(jobQueue chan Job) { 
  12.  w.StopChan = make(chan bool) 
  13.  successChan := make(chan bool) 
  14.  
  15.  go func() { 
  16.   successChan <- true 
  17.   for { 
  18.    // take job 
  19.    job := <-jobQueue 
  20.    if job != nil { 
  21.     job.Start(w) 
  22.    } else { 
  23.     fmt.Printf("worker %s to be stopped\n", w.Name) 
  24.     w.StopChan <- true 
  25.     break 
  26.    } 
  27.   } 
  28.  }() 
  29.  
  30.  // wait for the worker to start 
  31.  <-successChan 
  32.  
  33. // Stop ... 
  34. func (w *Worker) Stop() { 
  35.  // wait for the worker to stop, blocking 
  36.  _ = <-w.StopChan 
  37.  fmt.Printf("worker %s stopped\n", w.Name) 

Worker 有一些屬性保存當(dāng)前的狀態(tài),另外還聲明了兩個(gè)方法分別用于啟動(dòng)、停止 worker。

在 Start() 方法里,創(chuàng)建了兩個(gè) channel 分別用于 worker 的啟動(dòng)和停止。最重要的是 for 循環(huán)里面,worker 會(huì)一直等待獲取 job 并可執(zhí)行的直到任務(wù)隊(duì)列關(guān)閉。

Job 是包含單個(gè)方法 Start() 的接口,所以只要實(shí)現(xiàn) Start() 方法就可以有不同類型的 job。

 
 
 
  1. // Job ... 
  2. type Job interface { 
  3.  Start(worker *Worker) error 

一旦 Worker 確定之后,接下來就是創(chuàng)建 pool 來管理 workers。

 
 
 
  1. import ( 
  2.  "fmt" 
  3.  "sync" 
  4.  
  5. // Pool ... 
  6. type Pool struct { 
  7.  Name string 
  8.  
  9.  Size    int 
  10.  Workers []*Worker 
  11.  
  12.  QueueSize int 
  13.  Queue     chan Job 
  14.  
  15. // Initiualize ... 
  16. func (p *Pool) Initialize() { 
  17.  // maintain minimum 1 worker 
  18.  if p.Size < 1 { 
  19.   p.Size = 1 
  20.  } 
  21.  p.Workers = []*Worker{} 
  22.  for i := 1; i <= p.Size; i++ { 
  23.   worker := &Worker{ 
  24.    ID:   i - 1, 
  25.    Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1), 
  26.   } 
  27.   p.Workers = append(p.Workers, worker) 
  28.  } 
  29.  
  30.  // maintain min queue size as 1 
  31.  if p.QueueSize < 1 { 
  32.   p.QueueSize = 1 
  33.  } 
  34.  p.Queue = make(chan Job, p.QueueSize) 
  35.  
  36. // Start ... 
  37. func (p *Pool) Start() { 
  38.  for _, worker := range p.Workers { 
  39.   worker.Start(p.Queue) 
  40.  } 
  41.  fmt.Println("all workers started") 
  42.  
  43. // Stop ... 
  44. func (p *Pool) Stop() { 
  45.  close(p.Queue) // close the queue channel 
  46.  
  47.  var wg sync.WaitGroup 
  48.  for _, worker := range p.Workers { 
  49.   wg.Add(1) 
  50.   go func(w *Worker) { 
  51.    defer wg.Done() 
  52.  
  53.    w.Stop() 
  54.   }(worker) 
  55.  } 
  56.  wg.Wait() 
  57.  fmt.Println("all workers stopped") 

Pool 包含 worker 切片和用于保存 job 的隊(duì)列。worker 的數(shù)量在初始化的時(shí)候是可以自定義。

關(guān)鍵點(diǎn)在 Stop() 的邏輯,當(dāng)它被調(diào)用時(shí),會(huì)先關(guān)閉 job 隊(duì)列,worker 便會(huì)從 job 隊(duì)列讀到 nil,接著就會(huì)關(guān)閉對(duì)應(yīng)的 worker。接著在 for 循環(huán)里,等待 worker 并發(fā)地停止直到最后一個(gè) worker 停止。

為了演示整體邏輯,下面的例子展示了一個(gè)僅僅輸出值的 job。

 
 
 
  1. import "fmt" 
  2.  
  3. func main() { 
  4.  pool := &Pool{ 
  5.   Name:      "test", 
  6.   Size:      5, 
  7.   QueueSize: 20, 
  8.  } 
  9.  pool.Initialize() 
  10.  pool.Start() 
  11.         defer pool.Stop() 
  12.  
  13.  for i := 1; i <= 100; i++ { 
  14.   job := &PrintJob{ 
  15.    Index: i, 
  16.   } 
  17.   pool.Queue <- job 
  18.  } 
  19.  
  20. // PrintJob ... 
  21. type PrintJob struct { 
  22.  Index int 
  23.  
  24. func (pj *PrintJob) Start(worker *Worker) error { 
  25.  fmt.Printf("job %s - %d\n", worker.Name, pj.Index) 
  26.  return nil 

如果你看了上面的代碼邏輯,就會(huì)發(fā)現(xiàn)很簡(jiǎn)單,創(chuàng)建了有 5 個(gè) worker 的工作池并且 job 隊(duì)列的大小是 20。

接著,模擬 job 創(chuàng)建和處理過程:一旦 job 被創(chuàng)建就會(huì) push 到任務(wù)隊(duì)列里,等待著的 worker 便會(huì)從隊(duì)列里取出 job 并處理。

類似下面這樣的輸出:

 
 
 
  1. all workers started 
  2. job test-worker-3 - 4 
  3. job test-worker-3 - 6 
  4. job test-worker-3 - 7 
  5. job test-worker-3 - 8 
  6. job test-worker-3 - 9 
  7. job test-worker-3 - 10 
  8. job test-worker-3 - 11 
  9. job test-worker-3 - 12 
  10. job test-worker-3 - 13 
  11. job test-worker-3 - 14 
  12. job test-worker-3 - 15 
  13. job test-worker-3 - 16 
  14. job test-worker-3 - 17 
  15. job test-worker-3 - 18 
  16. job test-worker-3 - 19 
  17. job test-worker-3 - 20 
  18. worker test-worker-3 to be stopped 
  19. job test-worker-4 - 5 
  20. job test-worker-0 - 1 
  21. worker test-worker-3 stopped 
  22. job test-worker-2 - 3 
  23. worker test-worker-2 to be stopped 
  24. worker test-worker-2 stopped 
  25. worker test-worker-4 to be stopped 
  26. worker test-worker-4 stopped 
  27. worker test-worker-0 to be stopped 
  28. worker test-worker-0 stopped 
  29. job test-worker-1 - 2 
  30. worker test-worker-1 to be stopped 
  31. worker test-worker-1 stopped 
  32. all workers stopped 

via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang

作者:sonic0002


文章題目:一個(gè)Demo學(xué)會(huì)WorkerPool
URL標(biāo)題:http://www.5511xx.com/article/ccogchh.html