掃二維碼與項(xiàng)目經(jīng)理溝通
我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
GO語(yǔ)言中的?goroutine?雖然相對(duì)于系統(tǒng)線程來(lái)說(shuō)比較輕量級(jí)(初始棧大小僅2KB),(并且支持動(dòng)態(tài)擴(kuò)容),而正常采用java,c++等語(yǔ)言啟用的線程一般都是內(nèi)核態(tài)的占用的內(nèi)存資源一般在4m左右,而假設(shè)我們的服務(wù)器CPU內(nèi)存為4G,那么很明顯才用的內(nèi)核態(tài)線程的并發(fā)總數(shù)量也就是1024個(gè),相反查看一下Go語(yǔ)言的協(xié)程則可以達(dá)到4*1024*1024/2=200w.這么一看就明白了為什么Go語(yǔ)言天生支持高并發(fā)。但是在高并發(fā)量下的?goroutine?頻繁創(chuàng)建和銷毀對(duì)于性能損耗以及GC來(lái)說(shuō)壓力也不小。充分將?goroutine?復(fù)用,減少?goroutine?的創(chuàng)建/銷毀的性能損耗,這便是?grpool?對(duì)?goroutine?進(jìn)行池化封裝的目的。例如,針對(duì)于100W個(gè)執(zhí)行任務(wù),使用?goroutine?的話需要不停創(chuàng)建并銷毀100W個(gè)?goroutine?,而使用?grpool?也許底層只需要幾萬(wàn)個(gè)?goroutine?便能充分復(fù)用地執(zhí)行完成所有任務(wù)。

創(chuàng)新互聯(lián)建站致力于成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作,成都網(wǎng)站設(shè)計(jì),集團(tuán)網(wǎng)站建設(shè)等服務(wù)標(biāo)準(zhǔn)化,推過(guò)標(biāo)準(zhǔn)化降低中小企業(yè)的建站的成本,并持續(xù)提升建站的定制化服務(wù)水平進(jìn)行質(zhì)量交付,讓企業(yè)網(wǎng)站從市場(chǎng)競(jìng)爭(zhēng)中脫穎而出。 選擇創(chuàng)新互聯(lián)建站,就選擇了安全、穩(wěn)定、美觀的網(wǎng)站建設(shè)服務(wù)!
經(jīng)測(cè)試,?goroutine?池對(duì)于業(yè)務(wù)邏輯的執(zhí)行效率(降低執(zhí)行時(shí)間/CPU使用率)提升不大,甚至沒有原生的?goroutine?執(zhí)行快速(池化?goroutine?執(zhí)行調(diào)度并沒有底層go調(diào)度器高效,因?yàn)槌鼗?goroutine?的執(zhí)行調(diào)度也是基于底層go調(diào)度器),但是由于采用了復(fù)用的設(shè)計(jì),池化后對(duì)內(nèi)存的使用率得到極大的降低。在v2版本中?grpool?也加入了貫穿全局的鏈路追蹤。
概念:
Pool?:?goroutine?池,用于管理若干可復(fù)用的?goroutine?協(xié)程資源; Worker?:池對(duì)象中參與任務(wù)執(zhí)行的?goroutine?,一個(gè)?Worker?可以執(zhí)行若干個(gè)?Job?,直到隊(duì)列中再無(wú)等待的?Job?; Job?:添加到池對(duì)象的任務(wù)隊(duì)列中等待執(zhí)行的任務(wù),是一個(gè)?func()?的方法,一個(gè)?Job?同時(shí)只能被一個(gè)?Worker?獲取并執(zhí)行;使用方式:
import "github.com/gogf/gf/v2/os/grpool"
使用場(chǎng)景:管理大量異步任務(wù)的場(chǎng)景、需要異步協(xié)程復(fù)用的場(chǎng)景、需要降低內(nèi)存使用率的場(chǎng)景。
接口文檔:
func Add(f func()) error
func Jobs() int
func Size() int
type Pool
func New(limit ...int) *Pool
func (p *Pool) Add(ctx context.Context, f Func) error
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error
func (p *Pool) Cap() int
func (p *Pool) Close()
func (p *Pool) IsClosed() bool
func (p *Pool) Jobs() int
func (p *Pool) Size() int
通過(guò)?grpool.New?方法創(chuàng)建一個(gè)?goroutine?池對(duì)象,參數(shù)?limit?為非必需參數(shù),用于限定池中的工作?goroutine?數(shù)量,默認(rèn)為不限制。需要注意的是,任務(wù)可以不停地往池中添加,沒有限制,但是工作的?goroutine?是可以做限制的。我們可以通過(guò)?Size()?方法查詢當(dāng)前的工作?goroutine?數(shù)量,使用?Jobs()?方法查詢當(dāng)前池中待處理的任務(wù)數(shù)量。
同時(shí),為便于使用,?grpool?包提供了默認(rèn)的?goroutine?池,默認(rèn)的池對(duì)象不限制?goroutine?數(shù)量,直接通過(guò)?grpool.Add?即可往默認(rèn)的池中添加任務(wù),任務(wù)參數(shù)必須是一個(gè) ?func()?類型的函數(shù)/方法。
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtimer"
"time"
)
var (
ctx = gctx.New()
)
func job(ctx context.Context) {
time.Sleep(1*time.Second)
}
func main() {
pool := grpool.New(100)
for i := 0; i < 1000; i++ {
pool.Add(ctx,job)
}
fmt.Println("worker:", pool.Size())
fmt.Println(" jobs:", pool.Jobs())
gtimer.SetInterval(ctx,time.Second, func(ctx context.Context) {
fmt.Println("worker:", pool.Size())
fmt.Println(" jobs:", pool.Jobs())
fmt.Println()
})
select {}
}
這段程序中的任務(wù)函數(shù)的功能是?sleep? 1秒鐘,這樣便能充分展示出?goroutine?數(shù)量限制功能。其中,我們使用了?gtime.SetInterval?定時(shí)器每隔1秒鐘打印出當(dāng)前默認(rèn)池中的工作?goroutine?數(shù)量以及待處理的任務(wù)數(shù)量。
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"sync"
)
var (
ctx = gctx.New()
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
grpool.Add(ctx,func(ctx context.Context) {
fmt.Println(i)
wg.Done()
})
}
wg.Wait()
}
我們這段代碼的目的是要順序地打印出0-9,然而運(yùn)行后卻輸出:
10
10
10
10
10
10
10
10
10
10
為什么呢?這里的執(zhí)行結(jié)果無(wú)論是采用go關(guān)鍵字來(lái)執(zhí)行還是?grpool?來(lái)執(zhí)行都是如此。原因是,對(duì)于異步線程/協(xié)程來(lái)講,函數(shù)進(jìn)行異步執(zhí)行注冊(cè)時(shí),該函數(shù)并未真正開始執(zhí)行(注冊(cè)時(shí)只在?goroutine?的棧中保存了變量?i?的內(nèi)存地址),而一旦開始執(zhí)行時(shí)函數(shù)才會(huì)去讀取變量?i?的值,而這個(gè)時(shí)候變量?i?的值已經(jīng)自增到了10。 清楚原因之后,改進(jìn)方案也很簡(jiǎn)單了,就是在注冊(cè)異步執(zhí)行函數(shù)的時(shí)候,把當(dāng)時(shí)變量i的值也一并傳遞獲取;或者把當(dāng)前變量i的值賦值給一個(gè)不會(huì)改變的臨時(shí)變量,在函數(shù)中使用該臨時(shí)變量而不是直接使用變量?i?。
改進(jìn)后的示例代碼如下:
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(v int){
fmt.Println(v)
wg.Done()
}(i)
}
wg.Wait()
}
執(zhí)行后,輸出結(jié)果為:
0
9
3
4
5
6
7
8
1
2
注意,異步執(zhí)行時(shí)并不會(huì)保證按照函數(shù)注冊(cè)時(shí)的順序執(zhí)行,以下同理。
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"sync"
)
var (
ctx = gctx.New()
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
v := i
grpool.Add(ctx, func(ctx context.Context) {
fmt.Println(v)
wg.Done()
})
}
wg.Wait()
}
執(zhí)行后,輸出結(jié)果為:
9
0
1
2
3
4
5
6
7
8
這里可以看到,使用?grpool?進(jìn)行任務(wù)注冊(cè)時(shí),注冊(cè)方法為?func(ctx context.Context)?,因此無(wú)法在任務(wù)注冊(cè)時(shí)把變量?i?的值注冊(cè)進(jìn)去(請(qǐng)盡量不要通過(guò)?ctx?傳遞業(yè)務(wù)參數(shù)),因此只能采用臨時(shí)變量的形式來(lái)傳遞當(dāng)前變量?i?的值。
?AddWithRecover?將新作業(yè)推送到具有指定恢復(fù)功能的池中。當(dāng)?userFunc?執(zhí)行過(guò)程中出現(xiàn)?panic?時(shí),會(huì)調(diào)用可選的?Recovery Func?。如果沒有傳入?Recovery Func?或賦空,則忽略?userFunc?引發(fā)的?panic?。該作業(yè)將異步執(zhí)行。
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/container/garray"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"time"
)
var (
ctx = gctx.New()
)
func main() {
array := garray.NewArray(true)
grpool.AddWithRecover(ctx, func(ctx context.Context) {
array.Append(1)
array.Append(2)
panic(1)
}, func(err error) {
array.Append(1)
})
grpool.AddWithRecover(ctx, func(ctx context.Context) {
panic(1)
array.Append(1)
})
time.Sleep(500 * time.Millisecond)
fmt.Print(array.Len())
}
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtime"
"sync"
"time"
)
var (
ctx = gctx.New()
)
func main() {
start := gtime.TimestampMilli()
wg := sync.WaitGroup{}
for i := 0; i < 10000000; i++ {
wg.Add(1)
grpool.Add(ctx,func(ctx context.Context) {
time.Sleep(time.Millisecond)
wg.Done()
})
}
wg.Wait()
fmt.Println(grpool.Size())
fmt.Println("time spent:", gtime.TimestampMilli() - start)
}
package main
import (
"fmt"
"github.com/gogf/gf/v2/os/gtime"
"sync"
"time"
)
func main() {
start := gtime.TimestampMilli()
wg := sync.WaitGroup{}
for i := 0; i < 10000000; i++ {
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done()
}()
}
wg.Wait()
fmt.Println("time spent:", gtime.TimestampMilli() - start)
}
測(cè)試結(jié)果為兩個(gè)程序各運(yùn)行3次取平均值。
grpool:
goroutine count: 847313
memory spent: ~2.1 G
time spent: 37792 ms
goroutine:
goroutine count: 1000W
memory spent: ~4.8 GB
time spent: 27085 ms
可以看到池化過(guò)后,執(zhí)行相同數(shù)量的任務(wù),?goroutine?數(shù)量減少很多,相對(duì)的內(nèi)存也降低了一倍以上,CPU時(shí)間耗時(shí)也勉強(qiáng)可以接受。

我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流