新聞中心
本文轉載自微信公眾號「網管叨bi叨」,作者KevinYan11 。轉載本文請聯系網管叨bi叨公眾號。

前言
上一篇文章 我用休眠做并發(fā)控制,搞垮了下游服務 發(fā)出去后得到不少網友的回應,有人問自己平時用的方案行不行,有人建議借鑒TCP的擁塞控制策略,動態(tài)地調整發(fā)起的并發(fā)數,還有人問為啥我要管下游抗不抗得住。
今天我就來總結幾種調用下游服務時做并發(fā)控制的方案。
因為我們這篇文章是科普向的文章,主要目的是總結一下應該怎么在享受并發(fā)帶來效率提升的同時做好并發(fā)控制讓整個系統的上下游都能更穩(wěn)定一些,不對限流、控制到底該哪個服務加,出了事故誰負責做討論。
并發(fā)控制方案
前面我們提到用休眠做并發(fā)控制的最大弊端是,沒有考慮下游服務的感受,每次開固定數量的goroutine 去執(zhí)行任務后,調用者休眠 1s 再來,而不是等待下游服務的反饋再開啟下一批任務執(zhí)行。
- func badConcurrency() {
- batchSize := 500
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- break
- }
- for _, item := range data {
- go func(i int) {
- doSomething(i)
- }(item)
- }
- time.Sleep(time.Second * 1)
- }
- }
此外上游還有請求分配不均的問題,休眠的時候完全沒有請求,休眠結束后不管下游有沒有執(zhí)行完成馬上又發(fā)起一批新的請求。
所以我們應該從等待下游反饋和請求分配盡量均勻兩個角度去做并發(fā)控制,當然實際項目中應該是兩方面結合才行。
本文的可執(zhí)行示例代碼請訪問下面的鏈接查看:
https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go
使用限流器
我們在向下游發(fā)起并發(fā)請求時可以通過限流器做一下限流,如果達到限制就阻塞直到能再次發(fā)起請求。一聽到阻塞直到blabla 有的同學是不是馬上內心小激動想用 channel 去實現一個限流器啦,「此處應用咳嗽聲」其實完全沒必要Golang 官方限流器 time/rate包的 Wait 方法就能給我們提供了這個功能。
- func useRateLimit() {
- limiter := rate.NewLimiter(rate.Every(1*time.Second), 500)
- batchSize := 500
- for {
- data, _ :=queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- for _, item := range data {
- // 阻塞直到令牌桶有充足的Token
- err := limiter.Wait(context.Background())
- if err != nil {
- fmt.Println("Error: ", err)
- return
- }
- go func(i int) {
- doSomething(i)
- }(item)
- }
- }
- }
- // 模擬調用下游服務
- func doSomething(i int) {
- time.Sleep(2 * time.Second)
- fmt.Println("End:", i)
- }
- // 模擬查詢N條數據
- func queryDataWithSizeN(size int) (dataList []int, err error) {
- rand.Seed(time.Now().Unix())
- dataList = rand.Perm(size)
- return
- }
time/rate包提供的限流器采用的是令牌桶算法,使用Wait方法是當桶中沒有足夠的令牌時調用者會阻塞直到能取到令牌,當然也可以通過Wait方法接受的Context參數設置等待超時時間。限流器往桶中放令牌的速率是恒定的這樣比單純使用time.Sleep請求更均勻些。
關于time/rate 限流器的使用方法的詳解,請查看我之前的文章:Golang官方限流器的用法詳解
用了限流器了之后,只是讓我們的并發(fā)請求分布地更均勻了,最好我們能在受到下游反饋完成后再開始下次并發(fā)。
使用WaitGroup
我們可以等上批并發(fā)請求都執(zhí)行完后再開始下一批任務,估計大部分同學聽到這馬上就會想到應該加WaitGroup
WaitGroup適合用于并發(fā)-等待的場景:一個goroutine在檢查點(Check Point)等待一組執(zhí)行任務的 worker goroutine 全部完成,如果在執(zhí)行任務的這些worker goroutine 還沒全部完成,等待的 goroutine 就會阻塞在檢查點,直到所有woker goroutine 都完成后才能繼續(xù)執(zhí)行。
- func useWaitGroup() {
- batchSize := 500
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- var wg sync.WaitGroup
- for _, item := range data {
- wg.Add(1)
- go func(i int) {
- doSomething(i)
- wg.Done()
- }(item)
- }
- wg.Wait()
- fmt.Println("Next bunch of data")
- }
- }
這里調用程序會等待這一批任務都執(zhí)行完后,再開始查下一批數據進行下一批請求,等待時間取決于這一批請求中最晚返回的那個響應用了多少時間。
使用Semaphore
如果你不想等一批全部完成后再開始下一批,也可以采用一個完成后下一個補上的策略,這種比使用WaitGroup做并發(fā)控制,如果下游資源夠,整個任務的處理時間會更快一些。這種策略需要使用信號量(Semaphore)做并發(fā)控制,Go 語言里通過擴展庫golang.org/x/sync/semaphore 提供了信號量并發(fā)原語。
關于信號量的使用方法和實現原理,可以讀讀我以前的文章:并發(fā)編程-信號量的使用方法和其實現原理
上面的程序改為使用信號量semaphore.Weighted做并發(fā)控制的示例如下:
- func useSemaphore() {
- var concurrentNum int64 = 10
- var weight int64 = 1
- var batchSize int = 50
- s := semaphore.NewWeighted(concurrentNum)
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- for _, item := range data {
- s.Acquire(context.Background(), weight)
- go func(i int) {
- doSomething(i)
- s.Release(weight)
- }(item)
- }
- }
- }
使用生產者消費者模式
也有不少讀者回復說得加線程池才行,因為每個人公司里可能都有在用的線程池實現,直接用就行,我在這里就不再獻丑給大家實現線程池了。在我看來我們其實是需要實現一個生產者和消費者模式,讓線程池幫助我們限制只有固定數量的消費者線程去做下游服務的調用,而生產者則是將數據存儲里取出來。
channel 正好能夠作為兩者之間的媒介。
- func useChannel() {
- batchSize := 50
- dataChan := make(chan int)
- var wg sync.WaitGroup
- wg.Add(batchSize + 1)
- // 生產者
- go func() {
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- break
- }
- for _, item := range data {
- dataChan <- item
- }
- }
- close(dataChan)
- wg.Done()
- }()
- // 消費者
- go func() {
- for i := 0; i < 50; i++ {
- go func() {
- for {
- select {
- case v, ok := <- dataChan:
- if !ok {
- wg.Done()
- return
- }
- doSomething(v)
- }
- }
- }()
- }
- }()
- wg.Wait()
- }
這個代碼實現里,如果用ErrorGroup代替WaitGroup的話還能更簡化一些,這個就留給讀者自己探索吧。
關于ErrorGroup的用法總結,推薦閱讀文章:覺得WaitGroup不好用?試試ErrorGroup吧!
總結
通過文章里總結的一些方法,我們也能看出來并發(fā)編程的場景下,除了關注發(fā)起的并發(fā)線程數外,更重要的是還需要關注被異步調用的下層服務的反饋,不是一味的加并發(fā)數就能解決問題的。理解我們?yōu)槭裁丛诓l(fā)編程中要關注下層服務的反饋是很重要的,否則我們列舉的那些方案其實都可以在goroutine里再開goroutine,不關心是否執(zhí)行完成直接返回,無限套娃下去。
文章標題:幾個預防并發(fā)搞垮下游服務的方法
分享鏈接:http://www.5511xx.com/article/codepdj.html


咨詢
建站咨詢
