Golang线程(协程)池的一种设计与实现

概览:

协程 goroutine

Go 语言的协程实现被称之为 goroutine,由 Go 运行时管理,在 Go 语言中通过协程实现并发编程非常简单:我们可以在一个处理进程中通过关键字 go 启用多个协程,然后在不同的协程中完成不同的子任务,这些用户在代码中创建和维护的协程本质上是用户级线程,Go 语言运行时会在底层通过调度器将用户级线程交给操作系统的系统级线程去处理,如果在运行过程中遇到某个 IO 操作而暂停运行,调度器会将用户级线程和系统级线程分离,以便让系统级线程去处理其他用户级线程,而当 IO 操作完成,需要恢复运行,调度器又会调度空闲的系统级线程来处理这个用户级线程,从而达到并发处理多个协程的目的。

要素

  • 池大小
  • 工作基本单元
  • 组织协调工作单元

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"errors"
"log"
"runtime"
"sync"
"time"

"golang.org/x/sync/errgroup"
)

// define pool size: runtime.NumCPU()
var goroutineMaxProcess = make(chan bool, runtime.NumCPU())

func boss(eg *errgroup.Group, wg *sync.WaitGroup) {
for i := 0; i < 40; i++ {
// re use id value: if you use this value in goroutine ,you must be do this.
idx := i + 1
// count waitgroup counter
wg.Add(1)
// update pool size: start using
goroutineMaxProcess <- true
// start goroutine process and capture inside error
eg.Go(func() error {
// define if return do this
// waitgroup counter --
// update pool size: release using
defer wg.Done()
defer func() { <-goroutineMaxProcess }()

// worker start work
if err := worker(idx); err != nil {
return err
}
return nil
})
}
// check err
err := eg.Wait()
if err != nil {
// ^-^ pannic
log.Panic(err)
}
}

func worker(idx int) error {
// log print
log.Printf("%d worker working...\n", idx)
// check goroutine error
if idx == 33 {
return errors.New("sorry 33")
}
// work simulation
time.Sleep(time.Second * 1)
return nil
}

func main() {
var eg errgroup.Group
var wg sync.WaitGroup

// start work
boss(&eg, &wg)
}

效果测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# go run main.go
2022/09/21 23:49:29 8 worker working...
2022/09/21 23:49:29 3 worker working...
2022/09/21 23:49:29 5 worker working...
2022/09/21 23:49:29 1 worker working...
2022/09/21 23:49:29 6 worker working...
2022/09/21 23:49:29 4 worker working...
2022/09/21 23:49:29 7 worker working...
2022/09/21 23:49:29 2 worker working...
2022/09/21 23:49:30 9 worker working...
2022/09/21 23:49:30 11 worker working...
2022/09/21 23:49:30 15 worker working...
2022/09/21 23:49:30 13 worker working...
2022/09/21 23:49:30 12 worker working...
2022/09/21 23:49:30 16 worker working...
2022/09/21 23:49:30 10 worker working...
2022/09/21 23:49:30 14 worker working...
2022/09/21 23:49:31 22 worker working...
2022/09/21 23:49:31 21 worker working...
2022/09/21 23:49:31 24 worker working...
2022/09/21 23:49:31 23 worker working...
2022/09/21 23:49:31 18 worker working...
2022/09/21 23:49:31 20 worker working...
2022/09/21 23:49:31 17 worker working...
2022/09/21 23:49:31 19 worker working...
2022/09/21 23:49:32 25 worker working...
2022/09/21 23:49:32 30 worker working...
2022/09/21 23:49:32 28 worker working...
2022/09/21 23:49:32 26 worker working...
2022/09/21 23:49:32 31 worker working...
2022/09/21 23:49:32 27 worker working...
2022/09/21 23:49:32 29 worker working...
2022/09/21 23:49:32 32 worker working...
2022/09/21 23:49:33 33 worker working...
2022/09/21 23:49:33 35 worker working...
2022/09/21 23:49:33 34 worker working...
2022/09/21 23:49:33 36 worker working...
2022/09/21 23:49:33 39 worker working...
2022/09/21 23:49:33 37 worker working...
2022/09/21 23:49:33 38 worker working...
2022/09/21 23:49:33 40 worker working...
2022/09/21 23:49:34 sorry 33
panic: sorry 33

goroutine 1 [running]:
log.Panic({0xc000072f38?, 0xc000094090?, 0xc00006c000?})
/Volumes/axe/tools/runtime/golang/go/src/log/log.go:388 +0x65
main.boss(0x0?, 0xc00001a0d0)
/Users/mardan/T/worker/main.go:43 +0xf2
main.main()
/Users/mardan/T/worker/main.go:64 +0x45
exit status 2

时间计时:go run main.go 5.22 real 0.14 user 0.11 sys

若按正常的串行方式40*1s,至少需要40s才可执行完成,现在仅需5s左右。下面我们再打印goroutine数,发现是正常的,因为我电脑是8核的,符合预期。

出现9是因为,8个goroutine加上main自己就是8+1=9,main也算一个,我们这个goroutine池排除了main,为对这个进行管理。你也可以将main纳入到,但是我觉得没必要。

1
2
3
4
5
6
7
8
9
10
11
12
13
2022/09/21 23:54:02 8 worker working... -> goroutine: 9
2022/09/21 23:54:02 2 worker working... -> goroutine: 7
···
2022/09/21 23:54:02 5 worker working... -> goroutine: 9
2022/09/21 23:54:03 11 worker working... -> goroutine: 7
2022/09/21 23:54:03 9 worker working... -> goroutine: 4
2022/09/21 23:54:03 10 worker working... -> goroutine: 9
2022/09/21 23:54:03 12 worker working... -> goroutine: 9
2022/09/21 23:54:03 13 worker working... -> goroutine: 9
···
2022/09/21 23:54:06 34 worker working... -> goroutine: 9
2022/09/21 23:54:06 37 worker working... -> goroutine: 8
2022/09/21 23:54:06 39 worker working... -> goroutine: 8