简介

处理大量并发是 Go 语言的一大优势。语言内置了方便的并发语法,可以非常方便的创建很多个轻量级的 goroutine 并发处理任务。相比于创建多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量,系统中能够创建的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创建 8GB/8KB = 1000000 个 goroutine,更何况系统还需要保留一部分内存运行日常管理任务,go 运行时需要内存运行 gc、处理 goroutine 切换等。使用的内存超过机器内存容量,系统会使用交换区(swap),导致性能急速下降。我们可以简单验证一下创建过多 goroutine 会发生什么:

func main() {var wg sync.WaitGroupwg.Add(10000000)for i := 0; i < 10000000; i++ {go func() {time.Sleep(1 * time.Minute)}()}wg.Wait()
}

在我的机器上(8G内存)运行上面的程序会报errno 1455,即Out of Memory错误,这很好理解。谨慎运行

另一方面,goroutine 的管理也是一个问题。goroutine 只能自己运行结束,外部没有任何手段可以强制j结束一个 goroutine。如果一个 goroutine 因为某种原因没有自行结束,就会出现 goroutine 泄露。此外,频繁创建 goroutine 也是一个开销。

鉴于上述原因,自然出现了与线程池一样的需求,即 goroutine 池。一般的 goroutine 池自动管理 goroutine 的生命周期,可以按需创建,动态缩容。向 goroutine 池提交一个任务,goroutine 池会自动安排某个 goroutine 来处理。

ants就是其中一个实现 goroutine 池的库。

快速使用

本文代码使用 Go Modules。

创建目录并初始化:

$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants

安装ants库,使用v2版本:

$ go get -u github.com/panjf2000/ants/v2

我们接下来要实现一个计算大量整数和的程序。首先创建基础的任务结构,并实现其执行任务方法:

type Task struct {index intnums  []intsum   intwg    *sync.WaitGroup
}func (t *Task) Do() {for _, num := range t.nums {t.sum += num}t.wg.Done()
}

很简单,就是将一个切片中的所有整数相加。

然后我们创建 goroutine 池,注意池使用完后需要手动关闭,这里使用defer关闭:

p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()func taskFunc(data interface{}) {task := data.(*Task)task.Do()fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}

上面调用了ants.NewPoolWithFunc()创建了一个 goroutine 池。第一个参数是池容量,即池中最多有 10 个 goroutine。第二个参数为每次执行任务的函数。当我们调用p.Invoke(data)的时候,ants池会在其管理的 goroutine 中找出一个空闲的,让它执行函数taskFunc,并将data作为参数。

接着,我们模拟数据,做数据切分,生成任务,交给 ants 处理:

const (DataSize    = 10000DataPerTask = 100
)nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {task := &Task{index: i + 1,nums:  nums[i*DataPerTask : (i+1)*DataPerTask],wg:    &wg,}tasks = append(tasks, task)p.Invoke(task)
}wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())

随机生成 10000 个整数,将这些整数分为 100 份,每份 100 个,生成Task结构,调用p.Invoke(task)处理。wg.Wait()等待处理完成,然后输出ants正在运行的 goroutine 数量,这时应该是 0。

最后我们将结果汇总,并验证一下结果,与直接相加得到的结果做一个比较:

var sum int
for _, task := range tasks {sum += task.sum
}var expect int
for _, num := range nums {expect += num
}fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)

运行:

$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172

确实,任务完成之后,正在运行的 goroutine 数量变为 0。而且我们验证了,结果没有偏差。另外需要注意,goroutine 池中任务的执行顺序是随机的,与提交任务的先后没有关系。由上面运行打印的任务标识我们也能发现这一点。

函数作为任务

ants支持将一个不接受任何参数的函数作为任务提交给 goroutine 运行。由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,否则就必须用某种方式将需要的数据传递进去,例如闭包。

提交函数作为任务的 goroutine 池使用ants.NewPool()创建,它只接受一个参数表示池子的容量。调用池子对象的Submit()方法来提交任务,将一个不接受任何参数的函数传入。

最开始的例子可以改写一下。增加一个任务包装函数,将任务需要的参数作为包装函数的参数。包装函数返回实际的任务函数,该任务函数就可以通过闭包访问它需要的数据了:

type taskFunc func()func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {return func() {for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {*sum += num}fmt.Printf("task:%d sum:%d\n", i+1, *sum)wg.Done()}
}

调用ants.NewPool(10)创建 goroutine 池,同样池子用完需要释放,这里使用defer

p, _ := ants.NewPool(10)
defer p.Release()

生成模拟数据,切分任务。提交任务给ants池执行,这里使用taskFuncWrapper()包装函数生成具体的任务,然后调用p.Submit()提交:

nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()

汇总结果,验证:

var sum int
for _, partSum := range partSums {sum += partSum
}var expect int
for _, num := range nums {expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

这个程序的功能与最开始的完全相同。

执行流程

GitHub 仓库中有个执行流程图,我重新绘制了一下:

执行流程如下:

  • 初始化 goroutine 池;

  • 提交任务给 goroutine 池,检查是否有空闲的 goroutine:

    • 已到上限,检查 goroutine 池是否是非阻塞的:

    • 未到上限,创建一个新的 goroutine 处理任务

    • 非阻塞,直接返回nil表示执行失败

    • 阻塞,等待 goroutine 空闲

    • 有,获取空闲 goroutine

    • 无,检查池中的 goroutine 数量是否已到池容量上限:

  • 任务处理完成,将 goroutine 交还给池,以待处理下一个任务

选项

ants提供了一些选项可以定制 goroutine 池的行为。选项使用Options结构定义:

// src/github.com/panjf2000/ants/options.go
type Options struct {ExpiryDuration time.DurationPreAlloc boolMaxBlockingTasks intNonblocking boolPanicHandler func(interface{})Logger Logger
}

各个选项含义如下:

  • ExpiryDuration:过期时间。表示 goroutine 空闲多长时间之后会被ants池回收

  • PreAlloc:预分配。调用NewPool()/NewPoolWithFunc()之后预分配worker(管理一个工作 goroutine 的结构体)切片。而且使用预分配与否会直接影响池中管理worker的结构。见下面源码

  • MaxBlockingTasks:最大阻塞任务数量。即池中 goroutine 数量已到池容量,且所有 goroutine 都处理繁忙状态,这时到来的任务会在阻塞列表等待。这个选项设置的是列表的最大长度。阻塞的任务数量达到这个值后,后续任务提交直接返回失败

  • Nonblocking:池是否阻塞,默认阻塞。提交任务时,如果ants池中 goroutine 已到上限且全部繁忙,阻塞的池会将任务添加的阻塞列表等待(当然受限于阻塞列表长度,见上一个选项)。非阻塞的池直接返回失败

  • PanicHandler:panic 处理。遇到 panic 会调用这里设置的处理函数

  • Logger:指定日志记录器

NewPool()部分源码:

if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerArray(loopQueueType, size)
} else {p.workers = newWorkerArray(stackType, 0)
}

使用预分配时,创建loopQueueType类型的结构,反之创建stackType类型。这是ants定义的两种管理worker的数据结构。

ants定义了一些With*函数来设置这些选项:

func WithOptions(options Options) Option {return func(opts *Options) {*opts = options}
}func WithExpiryDuration(expiryDuration time.Duration) Option {return func(opts *Options) {opts.ExpiryDuration = expiryDuration}
}func WithPreAlloc(preAlloc bool) Option {return func(opts *Options) {opts.PreAlloc = preAlloc}
}func WithMaxBlockingTasks(maxBlockingTasks int) Option {return func(opts *Options) {opts.MaxBlockingTasks = maxBlockingTasks}
}func WithNonblocking(nonblocking bool) Option {return func(opts *Options) {opts.Nonblocking = nonblocking}
}func WithPanicHandler(panicHandler func(interface{})) Option {return func(opts *Options) {opts.PanicHandler = panicHandler}
}func WithLogger(logger Logger) Option {return func(opts *Options) {opts.Logger = logger}
}

这里使用了 Go 语言中非常常见的一种模式,我称之为选项模式,非常方便地构造有大量参数,且大部分有默认值或一般不需要显式设置的对象。

我们来验证几个选项。

最大等待队列长度

ants池设置容量之后,如果所有的 goroutine 都在处理任务。这时提交的任务默认会进入等待队列,WithMaxBlockingTasks(maxBlockingTasks int)可以设置等待队列的最大长度。超过这个长度,提交任务直接返回错误:

func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)time.Sleep(1 * time.Second)wg.Done()}
}func main() {p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))defer p.Release()var wg sync.WaitGroupwg.Add(8)for i := 1; i <= 8; i++ {go func(i int) {err := p.Submit(wrapper(i, &wg))if err != nil {fmt.Printf("task:%d err:%v\n", i, err)wg.Done()}}(i)}wg.Wait()
}

上面代码中,我们设置 goroutine 池的容量为 4,最大阻塞队列长度为 2。然后一个 for 提交 8 个任务,期望结果是:4 个任务在执行,2 个任务在等待,2 个任务提交失败。运行结果:

hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2

我们看到提交任务失败,打印too many goroutines blocked ...

代码中有 4 点需要注意:

  • 提交任务必须并行进行。如果是串行提交,第 5 个任务提交时由于池中没有空闲的 goroutine 处理该任务,Submit()方法会被阻塞,后续任务就都不能提交了。也就达不到验证的目的了

  • 由于任务可能提交失败,失败的任务不会实际执行,所以实际上wg.Done()次数会小于 8。因而在err != nil分支中我们需要调用一次wg.Done()。否则wg.Wait()会永远阻塞

  • 为了避免任务执行过快,空出了 goroutine,观察不到现象,每个任务中我使用time.Sleep(1 * time.Second)休眠 1s

  • 由于 goroutine 之间的执行顺序未显式同步,故每次执行的顺序不确定

由于简单起见,前面的例子中Submit()方法的返回值都被我们忽略了。实际开发中一定不要忽略。

非阻塞

ants池默认是阻塞的,我们可以使用WithNonblocking(nonblocking bool)设置其为非阻塞。非阻塞的ants池中,在所有 goroutine 都在处理任务时,提交新任务会直接返回错误:

func main() {p, _ := ants.NewPool(2, ants.WithNonblocking(true))defer p.Release()var wg sync.WaitGroupwg.Add(3)for i := 1; i <= 3; i++ {err := p.Submit(wrapper(i, &wg))if err != nil {fmt.Printf("task:%d err:%v\n", i, err)wg.Done()}}wg.Wait()
}

使用上个例子中的wrapper()函数,ants池容量设置为 2。连续提交 3 个任务,期望结果前两个任务正常执行,第 3 个任务提交时返回错误:

hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1

panic 处理器

一个鲁棒性强的库一定不会忽视错误的处理,特别是宕机相关的错误。在 Go 语言中就是 panic,也被称为运行时恐慌,在程序运行的过程中产生的严重性错误,例如索引越界,空指针解引用等,都会触发 panic。如果不处理 panic,程序会直接意外退出,可能造成数据丢失的严重后果。

ants中如果 goroutine 在执行任务时发生panic,会终止当前任务的执行,将发生错误的堆栈输出到os.Stderr注意,该 goroutine 还是会被放回池中,下次可以取出执行新的任务

func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)if i%2 == 0 {panic(fmt.Sprintf("panic from task:%d", i))}wg.Done()}
}func main() {p, _ := ants.NewPool(2)defer p.Release()var wg sync.WaitGroupwg.Add(3)for i := 1; i <= 2; i++ {p.Submit(wrapper(i, &wg))}time.Sleep(1 * time.Second)p.Submit(wrapper(3, &wg))p.Submit(wrapper(5, &wg))wg.Wait()
}

我们让偶数个任务触发panic。提交两个任务,第二个任务一定会触发panic。触发panic之后,我们还可以继续提交任务 3、5。注意这里没有 4,提交任务 4 还是会触发panic

上面的程序需要注意 2 点:

  • 任务函数中wg.Done()是在panic方法之后,如果触发了panic,函数中的其他正常逻辑就不会再继续执行了。所以我们虽然wg.Add(3),但是一共提交了 4 个任务,其中一个任务触发了panicwg.Done()没有正确执行。实际开发中,我们一般使用defer语句来确保wg.Done()一定会执行

  • 在 for 循环之后,我添加了一行代码time.Sleep(1 * time.Second)。如果没有这一行,后续的两条Submit()方法可以直接执行,可能会导致任务很快就完成了,wg.Wait()直接返回了,这时panic的堆栈还没有输出。你可以尝试注释掉这行代码运行看看结果

除了ants提供的默认 panic 处理器,我们还可以使用WithPanicHandler(paincHandler func(interface{}))指定我们自己编写的 panic 处理器。处理器的参数就是传给panic的值:

func panicHandler(err interface{}) {fmt.Fprintln(os.Stderr, err)
}p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()

其余代码与上面的完全相同,指定了panicHandler后触发panic就会执行它。运行:

hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3

看到输出了传给panic函数的字符串(第二行输出)。

默认池

为了方便使用,很多 Go 库都喜欢提供其核心功能类型的一个默认实现。可以直接通过库提供的接口调用。例如net/http,例如antsants库中定义了一个默认的池,默认容量为MaxInt32。goroutine 池的各个方法都可以直接通过ants包直接访问:

// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)func Submit(task func()) error {return defaultAntsPool.Submit(task)
}func Running() int {return defaultAntsPool.Running()
}func Cap() int {return defaultAntsPool.Cap()
}func Free() int {return defaultAntsPool.Free()
}func Release() {defaultAntsPool.Release()
}func Reboot() {defaultAntsPool.Reboot()
}

直接使用:

func main() {defer ants.Release()var wg sync.WaitGroupwg.Add(2)for i := 1; i <= 2; i++ {ants.Submit(wrapper(i, &wg))}wg.Wait()
}

默认池也需要Release()

总结

本文介绍了 goroutine 池的由来,并借由ants库介绍了基本的使用方法,和一些细节。ants源码不多,去掉测试的核心代码只有 1k 行左右,建议有时间、感兴趣的童鞋深入阅读。

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue????

参考

  1. ants GitHub:github.com/valyala/ants

  2. Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~

Go 每日一库之 ants相关推荐

  1. go get 失败 no go files in_Go 每日一库之 dig

    简介 今天我们来介绍 Go 语言的一个依赖注入(DI)库--dig.dig 是 uber 开源的库.Java 依赖注入的库有很多,相信即使不是做 Java 开发的童鞋也听过大名鼎鼎的 Spring.相 ...

  2. go float64 比较_Go 每日一库之 plot

    Go 每日一库之 plot 简介 本文介绍 Go 语言的一个非常强大.好用的绘图库--plot.plot内置了很多常用的组件,基本满足日常需求.同时,它也提供了定制化的接口,可以实现我们的个性化需求. ...

  3. go get 的不再src目录中_Go 每日一库之 sqlc:根据 sql 生成代码

    简介 在 Go 语言中编写数据库操作代码真的非常痛苦!database/sql标准库提供的都是比较底层的接口.我们需要编写大量重复的代码.大量的模板代码不仅写起来烦,而且还容易出错.有时候字段类型修改 ...

  4. go 默认http版本_【每日一库】超赞的 Go 语言 INI 文件操作

    点击上方蓝色"Go语言中文网"关注我们,领全套Go资料,每天学习 Go 语言 如果你使用 INI 作为系统的配置文件,那么一定会使用这个库吧.没错,它就是号称地表 最强大.最方便  ...

  5. go get如何删除_Go 每日一库之 xorm

    简介 Go 标准库提供的数据库接口database/sql比较底层,使用它来操作数据库非常繁琐,而且容易出错.因而社区开源了不少第三方库,如上一篇文章中的sqlc工具,还有各式各样的 ORM (Obj ...

  6. go 根据输入类型执行对应的方法_Go 每日一库之 sqlc

    简介 在 Go 语言中编写数据库操作代码真的非常痛苦!database/sql标准库提供的都是比较底层的接口.我们需要编写大量重复的代码.大量的模板代码不仅写起来烦,而且还容易出错.有时候字段类型修改 ...

  7. Go 每日一库之 zap

    转载地址:Go 每日一库之 zap - SegmentFault 思否 简介 在很早之前的文章中,我们介绍过 Go 标准日志库log和结构化的日志库logrus.在热点函数中记录日志对日志库的执行性能 ...

  8. 每日一库之Go 强大而灵活的电子邮件库:email

    发送邮件是一个很常见的需求:用户邮箱验证.邮箱召回等.Go 语言标准库自带 net/smtp 库,实现了 smtp 协议,用于发送邮件.然而这个库比较原始,使用不方便,而且官方声明不再增加新功能.于是 ...

  9. Go 每日一库之 xorm

    简介 Go 标准库提供的数据库接口database/sql比较底层,使用它来操作数据库非常繁琐,而且容易出错.因而社区开源了不少第三方库,如上一篇文章中的sqlc工具,还有各式各样的 ORM (Obj ...

最新文章

  1. C++ Primer 5th笔记(chap 14 重载运算和类型转换)输入和输出运算符
  2. python统计单词频率、存放在字典中_Python3实现统计单词表中每个字母出现频率的方法示例...
  3. pthread_create函数编译时报错:undefined reference to 'pthread_create'
  4. apt-get 与 yum的区别 (转)
  5. hadoop容灾能力测试
  6. php OpenSSL 加解密
  7. BlazorCharts 原生图表库的建设历程
  8. 颜宁分享干货:给实验室博士的一些忠告
  9. delphi random 六位_《蒙面唱将猜猜猜》第五季将播,六位唱将率先登场
  10. appender log4j 扩展_Log4j扩展使用--输出地Appender
  11. 基于CSE的微服务工程实践-多微服务框架演进
  12. java用户注册模块_用户注册登录模块设计方案报告.docx
  13. 法学类计算机专业,就业蓝皮书:计算机类专业领跑薪酬榜 法学专业被亮“红牌”...
  14. VSCode中Clangd无法找到stdio.h
  15. 论文图表录 出现几个 错误标签未定义的简单解决方法
  16. 组策略命令应用设置大全
  17. 开发一个商城需要多少钱 做一个电商网站大概多少钱
  18. numpy.arange()参数含义
  19. 3D立方体图片切换动画
  20. Java项目:SpringBoot图书管理系统

热门文章

  1. 2020.12.10【读书笔记】丨Survey二代数据质控
  2. 使用QTableWidget在Python界面中画表格
  3. uml点餐系统活动图_UML活动图(Activity Diagram)
  4. 微型计算机系统采用总,微型计算机系统采用总线对CPU、存储器设备进行连接。他们主要负责传送的信号是______。...
  5. 油猴脚本+钉钉机器人实现实时合并提醒
  6. NYOJ589 糖果
  7. Unity实现多屏分屏效果(新手向)
  8. CDN缓存加速系统wdcdn3.1版本发布(20120929)
  9. 当代大学生,千万别以辍学为荣!
  10. vue中element ie9的兼容问题