日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Go語言的并發(fā)與WorkerPool

本文轉(zhuǎn)載自微信公眾號「Golang來啦」,作者Seekload。轉(zhuǎn)載本文請聯(lián)系Golang來啦公眾號。

創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比巨鹿網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式巨鹿網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋巨鹿地區(qū)。費用合理售后完善,十余年實體公司更值得信賴。

四哥水平有限,如有翻譯或理解錯誤,煩請幫忙指出,感謝!

昨天分享關(guān)于 workerPool 的文章,有同學在后臺說,昨天的 Demo 恰好符合項目的業(yè)務(wù)場景,真的非常棒!

所以今天就再來分享一篇 。

原文如下:

現(xiàn)代編程語言中,并發(fā)已經(jīng)成為必不可少的特性。現(xiàn)在絕大多數(shù)編程語言都有一些方法實現(xiàn)并發(fā)。

其中一些實現(xiàn)方式非常強大,能將負載轉(zhuǎn)移到不同的系統(tǒng)線程,比如 Java 等;一些則在同一線程上模擬這種行為,比如 Ruby 等。

Golang 的并發(fā)模型非常強大,稱為 CSP(通信順序進程),它將一個問題分解成更小的順序進程,然后調(diào)度這些進程的實例(稱為 Goroutine)。這些進程通過 channel 傳遞信息實現(xiàn)通信。

本文,我們將探討如何利用 golang 的并發(fā)性,以及如何在 workerPool 使用。系列文章的第二篇,我們將探討如何構(gòu)建一個強大的并發(fā)解決方案。

一個簡單的例子

假設(shè)我們需要調(diào)用一個外部 API 接口,整個過程需要花費 100ms。如果我們需要同步地調(diào)用該接口 1000 次,則需要花費 100s。

 
 
 
  1. //// model/data.go
  2. package model
  3. type SimpleData struct {
  4.  ID int
  5. }
  6. //// basic/basic.go
  7. package basic
  8. import (
  9.  "fmt"
  10.  "github.com/Joker666/goworkerpool/model"
  11.  "time"
  12. )
  13. func Work(allData []model.SimpleData) {
  14.  start := time.Now()
  15.  for i, _ := range allData {
  16.   Process(allData[i])
  17.  }
  18.  elapsed := time.Since(start)
  19.  fmt.Printf("Took ===============> %s\n", elapsed)
  20. }
  21. func Process(data model.SimpleData) {
  22.  fmt.Printf("Start processing %d\n", data.ID)
  23.  time.Sleep(100 * time.Millisecond)
  24.  fmt.Printf("Finish processing %d\n", data.ID)
  25. }
  26. //// main.go
  27. package main
  28. import (
  29.  "fmt"
  30.  "github.com/Joker666/goworkerpool/basic"
  31.  "github.com/Joker666/goworkerpool/model"
  32.  "github.com/Joker666/goworkerpool/worker"
  33. )
  34. func main() {
  35.  // Prepare the data
  36.  var allData []model.SimpleData
  37.  for i := 0; i < 1000; i++ {
  38.   data := model.SimpleData{ ID: i }
  39.   allData = append(allData, data)
  40.  }
  41.  fmt.Printf("Start processing all work \n")
  42.  // Process
  43.  basic.Work(allData)
  44. }
 
 
 
  1. Start processing all work
  2. Took ===============> 1m40.226679665s

上面的代碼創(chuàng)建了 model 包,包里包含一個結(jié)構(gòu)體,這個結(jié)構(gòu)體只有一個 int 類型的成員。我們同步地處理 data,這顯然不是最佳方案,因為可以并發(fā)處理這些任務(wù)。我們換一種方案,使用 goroutine 和 channel 來處理。

異步

 
 
 
  1. //// worker/notPooled.go
  2. func NotPooledWork(allData []model.SimpleData) {
  3.  start := time.Now()
  4.  var wg sync.WaitGroup
  5.  dataCh := make(chan model.SimpleData, 100)
  6.  wg.Add(1)
  7.  go func() {
  8.   defer wg.Done()
  9.   for data := range dataCh {
  10.    wg.Add(1)
  11.    go func(data model.SimpleData) {
  12.     defer wg.Done()
  13.     basic.Process(data)
  14.    }(data)
  15.   }
  16.  }()
  17.  for i, _ := range allData {
  18.   dataCh <- allData[i]
  19.  }
  20.  close(dataCh)
  21.  wg.Wait()
  22.  elapsed := time.Since(start)
  23.  fmt.Printf("Took ===============> %s\n", elapsed)
  24. }
  25. //// main.go
  26. // Process
  27. worker.NotPooledWork(allData)
 
 
 
  1. Start processing all work
  2. Took ===============> 101.191534ms

上面的代碼,我們創(chuàng)建了容量 100 的緩存 channel,并通過 NoPooledWork() 將數(shù)據(jù) push 到 channel 里。channel 長度滿 100 之后,我們是無法再向其中添加元素直到有元素被讀取走。使用 for range 讀取 channel,并生成 goroutine 處理。這里我們沒有限制生成 goroutine 的數(shù)量,這可以盡可能多地處理任務(wù)。從理論上來講,在給定所需資源的情況下,可以處理盡可能多的數(shù)據(jù)。執(zhí)行代碼,完成 1000 個任務(wù)只花費了 100ms。很瘋狂吧!不全是,接著往下看。

問題

除非我們擁有地球上所有的資源,否則在特定時間內(nèi)能夠分配的資源是有限的。一個 goroutine 占用的最小內(nèi)存是 2k,但也能達到 1G。上述并發(fā)執(zhí)行所有任務(wù)的解決方案中,假設(shè)有一百萬個任務(wù),就會很快耗盡機器的內(nèi)存和 CPU。我們要么升級機器的配置,要么就尋找其他更好的解決方案。

計算機科學家很久之前就考慮過這個問題,并提出了出色的解決方案 - 使用 Thread Pool 或者 Worker Pool。這個方案是使用 worker 數(shù)量受限的工作池來處理任務(wù),workers 會按順序一個接一個處理任務(wù),這樣就避免了 CPU 和內(nèi)存使用急速增長。

解決方案:Worker Pool

我們通過實現(xiàn) worker pool 來修復(fù)之前遇到的問題。

 
 
 
  1. //// worker/pooled.go
  2. func PooledWork(allData []model.SimpleData) {
  3.  start := time.Now()
  4.  var wg sync.WaitGroup
  5.  workerPoolSize := 100
  6.  dataCh := make(chan model.SimpleData, workerPoolSize)
  7.  for i := 0; i < workerPoolSize; i++ {
  8.   wg.Add(1)
  9.   go func() {
  10.    defer wg.Done()
  11.    for data := range dataCh {
  12.     basic.Process(data)
  13.    }
  14.   }()
  15.  }
  16.  for i, _ := range allData {
  17.   dataCh <- allData[i]
  18.  }
  19.  close(dataCh)
  20.  wg.Wait()
  21.  elapsed := time.Since(start)
  22.  fmt.Printf("Took ===============> %s\n", elapsed)
  23. }
  24. //// main.go
  25. // Process
  26. worker.PooledWork(allData)
 
 
 
  1. Start processing all work
  2. Took ===============> 1.002972449s

上面的代碼,worker 數(shù)量限制在 100,我們創(chuàng)建了相應(yīng)數(shù)量的 goroutine 來處理任務(wù)。我們可以把 channel 看作是隊列,worker goroutine 看作是消費者。多個 goroutine 可以監(jiān)聽同一個 channel,但是 channel 里的每一個元素只會被處理一次。

Go 語言的 channel 可以當作隊列使用。

這是一個比較好的解決方案,執(zhí)行代碼,我們看到完成所有任務(wù)花費 1s。雖然沒有 100ms 這么快,但已經(jīng)能滿足業(yè)務(wù)需要,而且我們得到了一個更好的解決方案,能將負載均攤在不同的時間片上。

處理錯誤

我們能做的還沒完。上面看起來是一個完整的解決方案,但卻不是的,我們沒有處理錯誤情況。所以需要模擬出錯的情形,并且看下我們需要怎么處理。

 
 
 
  1. //// worker/pooledError.go
  2. func PooledWorkError(allData []model.SimpleData) {
  3.  start := time.Now()
  4.  var wg sync.WaitGroup
  5.  workerPoolSize := 100
  6.  dataCh := make(chan model.SimpleData, workerPoolSize)
  7.  errors := make(chan error, 1000)
  8.  for i := 0; i < workerPoolSize; i++ {
  9.   wg.Add(1)
  10.   go func() {
  11.    defer wg.Done()
  12.    for data := range dataCh {
  13.     process(data, errors)
  14.    }
  15.   }()
  16.  }
  17.  for i, _ := range allData {
  18.   dataCh <- allData[i]
  19.  }
  20.  close(dataCh)
  21.  wg.Add(1)
  22.  go func() {
  23.   defer wg.Done()
  24.   for {
  25.    select {
  26.    case err := <-errors:
  27.     fmt.Println("finished with error:", err.Error())
  28.    case <-time.After(time.Second * 1):
  29.     fmt.Println("Timeout: errors finished")
  30.     return
  31.    }
  32.   }
  33.  }()
  34.  defer close(errors)
  35.  wg.Wait()
  36.  elapsed := time.Since(start)
  37.  fmt.Printf("Took ===============> %s\n", elapsed)
  38. }
  39. func process(data model.SimpleData, errors chan<- error) {
  40.  fmt.Printf("Start processing %d\n", data.ID)
  41.  time.Sleep(100 * time.Millisecond)
  42.  if data.ID % 29 == 0 {
  43.   errors <- fmt.Errorf("error on job %v", data.ID)
  44.  } else {
  45.   fmt.Printf("Finish processing %d\n", data.ID)
  46.  }
  47. }
  48. //// main.go
  49. // Process
  50. worker.PooledWorkError(allData)

我們修改了 process() 函數(shù),處理一些隨機的錯誤并將錯誤 push 到 errors chnanel 里。所以,為了處理并發(fā)出現(xiàn)的錯誤,我們可以使用 errors channel 保存錯誤數(shù)據(jù)。在所有任務(wù)處理完成之后,可以檢查錯誤 channel 是否有數(shù)據(jù)。錯誤 channel 里的元素保存了任務(wù) ID,方便需要的時候再處理這些任務(wù)。

比之前沒處理錯誤,很明顯這是一個更好的解決方案。但我們還可以做得更好,

我們將在下篇文章討論如何編寫一個強大的 worker pool 包,并且在 worker 數(shù)量受限的情況下處理并發(fā)任務(wù)。

總結(jié)

Go 語言的并發(fā)模型足夠強大給力,只需要構(gòu)建一個 worker pool 就能很好地解決問題而無需做太多工作,這就是它沒有包含在標準庫中的原因。但是,我們自己可以構(gòu)建一個滿足自身需求的方案。很快,我會在下一篇文章中講到,敬請期待!

點擊【閱讀原文】直達代碼倉庫[1]。

參考資料

[1]代碼倉庫: https://github.com/Joker666/goworkerpool?ref=hackernoon.com

via:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-1-e9n31ao

作者:Hasan


文章題目:Go語言的并發(fā)與WorkerPool
網(wǎng)頁地址:http://www.5511xx.com/article/coieses.html