掃二維碼與項目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
哈嘍,大家好,我是asong。在寫上一篇文章請勿濫用goroutine時,發(fā)現(xiàn)Go語言擴展包提供了一個帶權(quán)重的信號量庫Semaphore,使用信號量我們可以實現(xiàn)一個"工作池"控制一定數(shù)量的goroutine并發(fā)工作。因為對源碼抱有好奇的態(tài)度,所以在周末仔細看了一下這個庫并進行了解析,在這里記錄一下。

目前創(chuàng)新互聯(lián)公司已為上千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬空間、網(wǎng)站托管運營、企業(yè)網(wǎng)站設(shè)計、海東網(wǎng)站維護等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
要想知道一個東西是什么,我都愛去百度百科上搜一搜,輸入"信號量",這答案不就來了。
百度百科解釋:
信號量(Semaphore),有時被稱為信號燈,是[多線程環(huán)境下使用的一種設(shè)施,是可以用來保證兩個或多個關(guān)鍵代碼段不被并發(fā)調(diào)用。在進入一個關(guān)鍵代碼段之前,線程必須獲取一個信號量;一旦該關(guān)鍵代碼段完成了,那么該線程必須釋放信號量。其它想進入該關(guān)鍵代碼段的線程必須等待直到第一個線程釋放信號量。為了完成這個過程,需要創(chuàng)建一個信號量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個關(guān)鍵代碼段的首末端。確認這些信號量VI引用的是初始創(chuàng)建的信號量。
通過這段解釋我們可以得知什么是信號量,其實信號量就是一種變量或者抽象數(shù)據(jù)類型,用于控制并發(fā)系統(tǒng)中多個進程對公共資源的訪問,訪問具有原子性。信號量主要分為兩類:
信號量是由操作系統(tǒng)來維護的,信號量只能進行兩種操作等待和發(fā)送信號,操作總結(jié)來說,核心就是PV操作:
在信號量進行PV操作時都為原子操作,并且在PV原語執(zhí)行期間不允許有中斷的發(fā)生。
PV原語對信號量的操作可以分為三種情況:
具體在什么場景使用本文就不在繼續(xù)分析,接下來我們重點來看一下Go語言提供的擴展包Semaphore,看看它是怎樣實現(xiàn)的。
我們之前在分析Go語言源碼時總會看到這幾個函數(shù):
- func runtime_Semacquire(s *uint32)
- func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
- func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
這幾個函數(shù)就是信號量的PV操作,不過他們都是給Go內(nèi)部使用的,如果想使用信號量,那就可以使用官方的擴展包:Semaphore,這是一個帶權(quán)重的信號量,接下來我們就重點分析一下這個庫。
安裝方法:go get -u golang.org/x/sync
- type Weighted struct {
- size int64 // 設(shè)置一個最大權(quán)值
- cur int64 // 標(biāo)識當(dāng)前已被使用的資源數(shù)
- mu sync.Mutex // 提供臨界區(qū)保護
- waiters list.List // 阻塞等待的調(diào)用者列表
- }
semaphore庫核心結(jié)構(gòu)就是Weighted,主要有4個字段:
- type waiter struct {
- n int64 // 等待調(diào)用者權(quán)重值
- ready chan<- struct{} // close channel就是喚醒
- }
這里只有兩個字段:
semaphore還提供了一個創(chuàng)建Weighted對象的方法,在初始化時需要給定最大權(quán)值:
- // NewWeighted為并發(fā)訪問創(chuàng)建一個新的加權(quán)信號量,該信號量具有給定的最大權(quán)值。
- func NewWeighted(n int64) *Weighted {
- w := &Weighted{size: n}
- return w
- }
先直接看代碼吧:
- func (s *Weighted) Acquire(ctx context.Context, n int64) error {
- s.mu.Lock() // 加鎖保護臨界區(qū)
- // 有資源可用并且沒有等待獲取權(quán)值的goroutine
- if s.size-s.cur >= n && s.waiters.Len() == 0 {
- s.cur += n // 加權(quán)
- s.mu.Unlock() // 釋放鎖
- return nil
- }
- // 要獲取的權(quán)值n大于最大的權(quán)值了
- if n > s.size {
- // 先釋放鎖,確保其他goroutine調(diào)用Acquire的地方不被阻塞
- s.mu.Unlock()
- // 阻塞等待context的返回
- <-ctx.Done()
- return ctx.Err()
- }
- // 走到這里就說明現(xiàn)在沒有資源可用了
- // 創(chuàng)建一個channel用來做通知喚醒
- ready := make(chan struct{})
- // 創(chuàng)建waiter對象
- w := waiter{n: n, ready: ready}
- // waiter按順序入隊
- elem := s.waiters.PushBack(w)
- // 釋放鎖,等待喚醒,別阻塞其他goroutine
- s.mu.Unlock()
- // 阻塞等待喚醒
- select {
- // context關(guān)閉
- case <-ctx.Done():
- err := ctx.Err() // 先獲取context的錯誤信息
- s.mu.Lock()
- select {
- case <-ready:
- // 在context被關(guān)閉后被喚醒了,那么試圖修復(fù)隊列,假裝我們沒有取消
- err = nil
- default:
- // 判斷是否是第一個元素
- isFront := s.waiters.Front() == elem
- // 移除第一個元素
- s.waiters.Remove(elem)
- // 如果是第一個元素且有資源可用通知其他waiter
- if isFront && s.size > s.cur {
- s.notifyWaiters()
- }
- }
- s.mu.Unlock()
- return err
- // 被喚醒了
- case <-ready:
- return nil
- }
- }
注釋已經(jīng)加到代碼中了,總結(jié)一下這個方法主要有三個流程:
- func main() {
- s := semaphore.NewWeighted(3)
- ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2)
- defer cancel()
- for i :=0; i < 3; i++{
- if i != 0{
- go func(num int) {
- if err := s.Acquire(ctx,3); err != nil{
- fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
- return
- }
- time.Sleep(2 * time.Second)
- fmt.Printf("goroutine: %d run over\n",num)
- s.Release(3)
- }(i)
- }else {
- go func(num int) {
- ct,cancel := context.WithTimeout(context.Background(), time.Second * 3)
- defer cancel()
- if err := s.Acquire(ct,3); err != nil{
- fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
- return
- }
- time.Sleep(3 * time.Second)
- fmt.Printf("goroutine: %d run over\n",num)
- s.Release(3)
- }(i)
- }
- }
- time.Sleep(10 * time.Second)
- }
上面的例子中g(shù)oroutine:0 使用ct對象來做控制,超時時間為3s,goroutine:1和goroutine:2對象使用ctx對象來做控制,超時時間為2s,這三個goroutine占用的資源都等于最大資源數(shù),也就是說只能有一個goruotine運行成功,另外兩個goroutine都會被阻塞,因為goroutine是搶占式調(diào)度,所以我們不能確定哪個gouroutine會第一個被執(zhí)行,這里我們假設(shè)第一個獲取到信號量的是gouroutine:2,阻塞等待的調(diào)用者列表順序是:goroutine:1 -> goroutine:0,因為在goroutine:2中有一個2s的延時,所以會觸發(fā)ctx的超時,ctx會下發(fā)Done信號,因為goroutine:2和goroutine:1都是被ctx控制的,所以就會把goroutine:1從等待者隊列中取消,但是因為goroutine:1屬于隊列的第一個隊員,并且因為goroutine:2已經(jīng)釋放資源,那么就會喚醒goroutine:0繼續(xù)執(zhí)行,畫個圖表示一下:
使用這種方式可以避免goroutine永久失眠。
- func (s *Weighted) TryAcquire(n int64) bool {
- s.mu.Lock() // 加鎖
- // 有資源可用并且沒有等待獲取資源的goroutine
- success := s.size-s.cur >= n && s.waiters.Len() == 0
- if success {
- s.cur += n
- }
- s.mu.Unlock()
- return success
- }
這個方法就簡單很多了,不阻塞地獲取權(quán)重為n的信號量,成功時返回true,失敗時返回false并保持信號量不變。
- func (s *Weighted) Release(n int64) {
- s.mu.Lock()
- // 釋放資源
- s.cur -= n
- // 釋放資源大于持有的資源,則會發(fā)生panic
- if s.cur < 0 {
- s.mu.Unlock()
- panic("semaphore: released more than held")
- }
- // 通知其他等待的調(diào)用者
- s.notifyWaiters()
- s.mu.Unlock()
- }
這里就是很常規(guī)的操作,主要就是資源釋放,同時進行安全性判斷,如果釋放資源大于持有的資源,則會發(fā)生panic。
在Acquire和Release方法中都調(diào)用了notifyWaiters,我們來分析一下這個方法:
- func (s *Weighted) notifyWaiters() {
- for {
- // 獲取等待調(diào)用者隊列中的隊員
- next := s.waiters.Front()
- // 沒有要通知的調(diào)用者了
- if next == nil {
- break // No more waiters blocked.
- }
- // 斷言出waiter信息
- w := next.Value.(waiter)
- if s.size-s.cur < w.n {
- // 沒有足夠資源為下一個調(diào)用者使用時,繼續(xù)阻塞該調(diào)用者,遵循先進先出的原則,
- // 避免需要資源數(shù)比較大的waiter被餓死
- //
- // 考慮一個場景,使用信號量作為讀寫鎖,現(xiàn)有N個令牌,N個reader和一個writer
- // 每個reader都可以通過Acquire(1)獲取讀鎖,writer寫入可以通過Acquire(N)獲得寫鎖定
- // 但不包括所有的reader,如果我們允許reader在隊列中前進,writer將會餓死-總是有一個令牌可供每個reader
- break
- }
- // 獲取資源
- s.cur += w.n
- // 從waiter列表中移除
- s.waiters.Remove(next)
- // 使用channel的close機制喚醒waiter
- close(w.ready)
- }
- }
這里只需要注意一個點:喚醒waiter采用先進先出的原則,避免需要資源數(shù)比較大的waiter被餓死。
到這里我們就把Semaphore的源代碼看了一篇,代碼行數(shù)不多,封裝的也很巧妙,那么我們該什么時候選擇使用它呢?
目前能想到一個場景就是Semaphore配合上errgroup實現(xiàn)一個"工作池",使用Semaphore限制goroutine的數(shù)量,配合上errgroup做并發(fā)控制,示例如下:
- const (
- limit = 2
- )
- func main() {
- serviceName := []string{
- "cart",
- "order",
- "account",
- "item",
- "menu",
- }
- eg,ctx := errgroup.WithContext(context.Background())
- s := semaphore.NewWeighted(limit)
- for index := range serviceName{
- name := serviceName[index]
- if err := s.Acquire(ctx,1); err != nil{
- fmt.Printf("Acquire failed and err is %s\n", err.Error())
- break
- }
- eg.Go(func() error {
- defer s.Release(1)
- return callService(name)
- })
- }
- if err := eg.Wait(); err != nil{
- fmt.Printf("err is %s\n", err.Error())
- return
- }
- fmt.Printf("run success\n")
- }
- func callService(name string) error {
- fmt.Println("call ",name)
- time.Sleep(1 * time.Second)
- return nil
- }
結(jié)果如下:
- call order
- call cart
- call account
- call item
- call menu
- run success
本文我們主要賞析了Go官方擴展庫Semaphore的實現(xiàn),他的設(shè)計思路簡單,僅僅用幾十行就完成了完美的封裝,值得我們借鑒學(xué)習(xí)。不過在實際業(yè)務(wù)場景中,我們使用信號量的場景并不多,大多數(shù)場景我們都可以使用channel來替代,但是有些場景使用Semaphore來實現(xiàn)會更好,比如上篇文章【[警惕] 請勿濫用goroutine】我們使用channel+sync來控制goroutine數(shù)量,這種實現(xiàn)方式并不好,因為實際已經(jīng)起來了多個goroutine,只不過控制了工作的goroutine數(shù)量,如果改用semaphore實現(xiàn)才是真正的控制了goroutine數(shù)量。
文中代碼已上傳github:https://github.com/asong2020/Golang_Dream/blob/master/code_demo/semaphore_demo/semaphore.go,歡迎star。

我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流