Go 限制协程执行的基本方法

Golang 537 字

问题定义

很多人在做程序的时候在启动 Goroutine 的时候并没有限制其执行的数量,如果任务只有 100 个,1000 、10000 个 其实都没有问题,但是如果你执行任务非常耗时,而且数量特别大,在不断的开 Goroutine 的时候很有可能就崩溃了。

在服务器上执行任务的时候是需要限制执行数量的,如果超过设置的执行数量就需要进行等待。

现在我们来模拟一下这种情况的出现。

package main

import (
    "fmt"
    "sync"
    "time"
)

func job(i int) {
    time.Sleep(time.Millisecond * 500)
    fmt.Printf("执行完毕,序号 %d \n\n", i)
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 10000000; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            job(i)
        }(i)
    }
    wg.Wait()
}

在 Job 函数中,我们故意让函数 sleep 500 毫秒,用来模拟耗时操作,我们来执行一下。

...

执行完毕,序号 286128 

执行完毕,序号 20228 

panic: too many concurrent operations on a single file or socket (max 1048575)

goroutine 1432450 [running]:
internal/poll.(*fdMutex).rwlock(0x14000128060, 0x0?)
        /opt/homebrew/Cellar/go/1.18.2/libexec/src/internal/poll/fd_mutex.go:147 +0x13c
internal/poll.(*FD).writeLock(...)
        /opt/homebrew/Cellar/go/1.18.2/libexec/src/internal/poll/fd_mutex.go:239
internal/poll.(*FD).Write(0x14000128060, {0x14131074780, 0x20, 0x30})
        /opt/homebrew/Cellar/go/1.18.2/libexec/src/internal/poll/fd_unix.go:370 +0x48
os.(*File).write(...)
        /opt/homebrew/Cellar/go/1.18.2/libexec/src/os/file_posix.go:48
os.(*File).Write(0x14000126008, {0x14131074780?, 0x20, 0x14070f5f758?})
        /opt/homebrew/Cellar/go/1.18.2/libexec/src/os/file.go:176 +0x64
fmt.Fprintf({0x104434438, 0x14000126008}, {0x1043fe079, 0x1b}, {0x14070f5f758, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.18.2/libexec/src/fmt/print.go:205 +0x88
fmt.Printf(...)
        /opt/homebrew/Cellar/go/1.18.2/libexec/src/fmt/print.go:213
main.job(0x0?)
        /Users/guozhu/WorkSpace/study-go/go/main.go:11 +0x78
main.main.func1(0x0?)
        /Users/guozhu/WorkSpace/study-go/go/main.go:20 +0x50
created by main.main
        /Users/guozhu/WorkSpace/study-go/go/main.go:18 +0x40

Process finished with the exit code 2

刚开始执行其实是没问题的,但是等执行一会后 Go 就会报错了,panic: too many concurrent operations on a single file or socket (max 1048575)

对单个 file/socket 的并发操作个数超过了系统上限,这个报错是 fmt.Printf 函数引起的,fmt.Printf 将格式化后的字符串打印到屏幕,即标准输出。在 linux 系统中,标准输出也可以视为文件,内核(kernel)利用文件描述符(file descriptor)来访问文件,标准输出的文件描述符为 1,错误输出文件描述符为 2,标准输入的文件描述符为 0,简而言之就是内存耗尽了。

那如果我们将 fmt.Printf 这行代码去掉呢?那程序很可能会因为内存不足而崩溃。这一点更好理解,每个协程至少需要消耗 2KB 的空间,那么假设计算机的内存是 2GB,那么至多允许 2GB/2KB = 1M 个协程同时存在。那如果协程中还存在着其他需要分配内存的操作,那么允许并发执行的协程将会数量级地减少。

利用 Channel 限制 Goroutine 数量

make(chan struct{}, 3) 创建缓冲区大小为 3 的 channel,在没有被接收的情况下,至多发送 3 个消息则被阻塞。

利用这样特性,我们会可以在开启 Goroutine 调用 <- struct{}{},若缓存区满了则会被阻塞。当协程任务执行结束,在调用 <- ch 释放缓冲区,这样就可以限制 Goroutine 的数量了。

package main

import (
    "fmt"
    "sync"
    "time"
)

func job(i int) {
    time.Sleep(time.Millisecond * 500)
    fmt.Printf("执行完毕,序号 %d \n\n", i)
}

var pool chan struct{}

func main() {
    pool = make(chan struct{}, 1000)

    wg := sync.WaitGroup{}

    for i := 0; i < 10000000; i++ {
        pool <- struct{}{}
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            defer func() {
                <-pool
            }()
            job(i)
        }(i)
    }
    wg.Wait()
}
maksim
Maksim(一笑,吡罗),PHPer,Goper
OωO
开启隐私评论,您的评论仅作者和评论双方可见