目 录CONTENT

文章目录
Go

Go并发编程-Goroutine

Sakura
2023-08-05 / 0 评论 / 1 点赞 / 246 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于438天前,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

一: 并发编程概述

并发编程指的是程序由若干个独立运行的代码构成,并发程序主要依赖多核 CPU 的并行运算和调度能力

典型的并发编程的解决方案是多线程并发。多个线程可以属于同一个进程并共享内存空间。因为多线程不需要创建新的虚拟内存空间,所以它们也不需要内存管理单元处理上下文的切换,线程之间的通信也正是基于共享的内存进行的,与重量级的进程相比,线程显得比较轻量。虽然线程比较轻量,但是在调度时也有比较大的额外开销。每个线程会都占用1M以上的内存空间,在切换线程时不止会消耗较多的内存,恢复寄存器中的内容还需要向操作系统申请或者销毁资源,每一次线程上下文的切换都需要消耗一定的时间。

Go 语言在并发编程方面的能力特别强大 , 实现典型的协程的概念 , 内置的 Go 调度器在调度 Goroutine 时开销非常低

并发编程主要考虑以下要素

  1. 程序代码段如何独立运行 --> Gotoutine

  2. 独立运行的代码段如何通信 --> Channel

  3. 如何做数据同步 , 调度同步? --> sync Channel

Go 并发的理念 : 通过通信来共享内存 ( 数据 ) , 而不是共享内存 ( 数据 ) 来通信

二: Groutine

Golang 中使用关键字 go 即可启动新的的 goroutine

// 输出奇数的函数
func PrintOdd() {
    for i := 1; i <= 10; i += 2 {
      fmt.Println(i)
      time.Sleep(100 * time.Millisecond)
    }
}

// 输出偶数的函数
func PrintEven() {
    for i := 2; i <= 10; i += 2 {
      fmt.Println(i)
      time.Sleep(100 * time.Millisecond)
    }
}

func Goroutine() {
    // 在 main goroutine 中,开启新的goroutine
    //并发调用
    go PrintOdd()
    go PrintEven()

    // main goroutine 运行结束
    // 内部调用的goroutine也就结束
    time.Sleep(time.Second)
}

三: sync.WaitGroup 实现协同调度

1. 基本使用

WaitGroup 用于等待一组 goroutine 完成。等待思路是计数器方案:

  1. 调用等待 goroutine 时,调用Add()增加等待的goroutine的数量

  2. 当具体的 goroutine 运行结束后,Done()用来减去计数

  3. 主 goroutine 可以使用 Wait 来阻塞,直到所有 goroutine 都完成 ( 计数器归零 )

func Goroutine() {
	//一: 初始化WaitGroup
	wg := sync.WaitGroup{}

	//二: 累加WG的计数器
	wg.Add(2)
	go func() {
		//三: 并发执行结束后计数器-1
		defer wg.Done()
		for i := 1; i <= 10; i += 2 {
			fmt.Println(i)
			time.Sleep(100 * time.Millisecond)
		}
	}()
	go func() {
		//三: 并发执行结束后计数器-1
		defer wg.Done()
		for i := 2; i <= 10; i += 2 {
			fmt.Println(i)
			time.Sleep(100 * time.Millisecond)
		}
	}()

	//四: 主 goroutine等待
	wg.Wait()
    //执行到Wait会被阻塞,需要等到所有goroutine结束
	fmt.Println("其他goroutine结束")}

注意:

  1. WaitGroup()适用于主 goroutine 需要等待其他 goroutine 全部运行结束后,才结束的情况。

  2. 不适用于 , 主goroutine需要结束,而通知其他goroutine结束的情景

  3. 不能复制 WaitGroup , 因为内部维护的计数器不能被篡改

2. 实现原理

WaitGroup 用于等待一组 goroutine 完成。等待思路是计数器方案:type WaitGroup struct {
	noCopy noCopy
    //高32 bits是计数器 ,低32 bits是等待者
	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

func (wg *WaitGroup) Add(delta int) {
    //核心代码,累加计数器,原子操作
	state := wg.state.Add(uint64(delta) << 32)

}

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

func (wg *WaitGroup) Wait() {

    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        // 如果计数器为0,则不需要等待
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // Increment waiters count.
}

四: 调度的随机性

不能期望goroutine的执行顺序依照源代码的顺序执行,如下代码,会随机输出0-9:

func GoroutineRandom() {
    wg := sync.WaitGroup{}
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go func(n int) {
            defer wg.Done()
            fmt.Println(n)
        }(i)
    }
    wg.Wait()
}

五: Goroutine 的并发规模

1. 并发规模

Goroutine 的并发数量上限

  1. 受 goroutine 占用的只能内存限制

  2. 受到内部操作资源限制

  3. gouroutine 本身无上限

函数runtime.NumGoroutine()可以获取当前存在的Goroutine数量。

func GoroutineNum() {
	//1.统计当前存在的Goroutine的数量
	go func() {
		for {
			fmt.Println("NumGoroutine:", runtime.NumGoroutine())
			//每隔半秒钟输出Goroutine的数量
			time.Sleep(500 * time.Millisecond)
		}
	}()

	//2.启动大量的goroutine
	for {
		go func() {
            //防止启动的goroutine执行结束销毁
			time.Sleep(100 * time.Second)
		}()
	}
}
NumGoroutine: 4000000  (如果系统资源足够多的,甚至能达到千万级并发)

runtime: VirtualAlloc of 32768 bytes failed with errno=1455
fatal error: out of memory

2. goroutine 最小为2KB

之所以支持百万级的goroutine并发,核心原因是因为每个 goroutine 的初始栈内存为2KB,用于保持goroutine中的执行数据,例如局部变量等。相对来说,线程的栈内存通常为2MB。除了比较小的初始栈内存外,goroutine的栈内存可扩容的,也就是说支持按需增大或缩小,一个goroutine最大的栈内存当前限制为1GB。

3. gorontine 内部资源竞争溢出

func GoroutineNum() {
	//1.统计当前存在的Goroutine的数量
	go func() {
		for {
			fmt.Println("NumGoroutine:", runtime.NumGoroutine())
			//每个半秒钟输出Goroutine的数量
			time.Sleep(500 * time.Millisecond)
		}
	}()

	//2.启动大量的goroutine
	for {
		go func() {
            // 标准输出句柄的使用
			fmt.Println("Sakura")
			time.Sleep(100 * time.Second)
		}()
	}
}
panic: too many concurrent operations on a single file or socket (max 1048575)

这个不是由于 Goroutime 的数量导致的 , 而是由于大量的 Goroutime 里面操作相同的 File 或者 Socket 导致的

4. 控制并发数量

实际开发时,要根据系统资源和每个 goroutine 所消耗的资源来控制并发规模

一些好用的 goroutine pool 包

  • Jeffail/tunny

  • panjf2000/ants ( Star 数量最多 )

  • go-playground/pool

go get -u github.com/panjf2000/ants/v2
func GoroutineAnts() {
	//1.统计Goroutine数量
	go func() {
		for {
			fmt.Println("Goroutine数量: ", runtime.NumGoroutine())
			time.Sleep(500 * time.Millisecond)
		}
	}()

	//2.初始化协程池 goroutine pool
	Size := 1024
	pool, err := ants.NewPool(Size)
	if err != nil {
		log.Fatalln(err)
	}
	//保证 pool 被关闭
	defer pool.Release()

	//3.利用goroutine pool ,调度需要并发的大量goroutine
	for {
		//想 pool 提交一个执行的goroutine
		pool.Submit(func() {
			v := make([]int, 1024)
			_ = v
			fmt.Println("in goroutine")
			time.Sleep(100 * time.Second)
		})
		if err != nil {
			log.Fatalln(err)
		}
	}
}

六: 并发调度

1. 多对多的协程调度模式

Goroutine 的概念来自于协程 Coroutine,协程,又称微线程,纤程。通过将多个协程映射到特定线程,提高程序(函数)的并发执行能力

通过将多个协程映射到特定线程,提高程序(函数)的并发执行能力 :

  1. 将一个线程的执行分成一段一段的 , 每一段由不用的协程来实现 , 可以不按照固定顺序来调用

  2. 每一段就是一个协程

  • 函数的调用是通过栈 ( 先进后出 ) 实现的 , 通常都是层级调用

- A
  - B
      - C
      - D
  - E
  - F
  • 而协程就是在某个函数执行的过程中 , 可以主动(Python的yield)和被动(go 的goroutine)的被终止执行,转而去执行其他函数。

    • 例如: A调用了B、E、F,可以做到,执行一会B,再去执行E、E没有执行完毕,又暂停去执行B,或F。直到全部执行完毕。

python 需要再函数中主动调用 yield 的关键字才能将执行权限让出 ( 协作式 )

go 语言中默认情况下 , 一个函数执行到一段 , 就会被调度器强制从执行权限里面拿出器 , 让其他函数去执行 ( 抢占式 )

  • 多个协程对应一个线程 : 多个协程通过应用程序自己维护的协程调度器完成多个协程的调度, 多个协程会轮番的用该线程执行

  • 多个协程对应多个线程 : 把多个协程调度到大量的线程中去执行

2. GMP 模型结构

Goroutine 就是 Go 语言实现的协程模型。其核心结构有三个,称为 GMP,也叫 GMP 模型。分别是:

  • G,Goroutine,我们使用关键字go调用的函数。存储于P的本地队列或者是全局队列中

  • M,Machine,就是 Work Thread,就是传统意义的线程,用于执行 Goroutine,G。只有在 M 与具体的 P 绑定后,才能执行 P 中的 G

  • P,Processor,处理器,主要用于协调 G 和 M 之间的关系 ( 具体某个 G 要在那个线程 M 中执行 ),存储需要执行的 G 队列,与特定的 M 绑定后,执行 Go 程序,也就是 G

调度的核心就是让具体的线程 M 去执行对应的 G , 让 M 找到 G , 让 M 更优化的执行大量的 G

  • GMP结构图

    • G 由 M 执行 , P 将 M 和 G 绑定在一起

    • 本地 G 队列有 256 的限制, 全局 G 队列没有限制

图例说明:

  • M程序线程,由OS负责调度交由具体的CPU核心中执行

  • 待执行的G可能存储于全局队列,和某个P的本地队列中。P的本地队列当前最多存储256个G。

  • 若要执行P中的G,则P必须要于对应的M建立联系。建立联系后,就可以执行P中的G了。

  • M与P不是强绑定的关系,若一个M阻塞,那么P就会选择一个新的M执行后续的G。该过程由Go调度器调度。

GMP的关系

  • G 是独立的运行单元

  • M是执行任务的线程

  • P是G和M的关联纽带

  • G要在M中执行,P的任务就是合理的将G分配给M

3. P 的数量和设置

P的数量通常是固定的,当程序启动时由 $GOMAXPROCS环境变量决定创建P的数量。默认的值为当前CPU的核心数所有的 P 都在程序启动时创建

  • Go 程序的执行过程中,最多同时有 $GOMAXPROCSGoroutine 同时运行,默认与 CPU 核数保持一致,可以最大程度利用多核 CPU 并行执行的能力

  • 程序运行时,runtime.GOMAXPROCS()函数可以动态改变 P 的数量,但通常不建议修改,或者即使修改也不建议数量超过 CPU 的核数。调动该函数的典型场景是控制程序的并行规模,例如:

// 可以自行设置需要占用的核数

// 最多利用一半的CPU , 在限制并发能力的场景中需要用到
runtime.GOMAXPROCS(runtime.NumCPU() / 2)

// 获取当前CPU的核数
runtime.NumCPU()
  • Go对 M 的数量做了一个上限,10000个,但通常不会到达这个规模,因为操作系统很难支持这么多的线程。

  • M的数量是由 P 决定的。

  • 当P需要执行时,会去找可用的 M,若没有可用的 M,就会创建新的 M,这就会导致 M 的数量不断增加

  • 当M线程长时间空闲,也就是长时间没有新任务时,GC 会将线程资源回收,这会导致 M 的数量减少

  • 整体来说,M 的数量一定会多于P的数量,取决于空闲( 没有 G 可执行的 )的,和完成其他任务(例如CGO操作,GC操作等)的M的数量多少

4. P 与 G 关联的流程

新创建的 G

  • 新创建的G会优先保持在P的本地队列中。例如 A 函数中执行了 go B(),那么 B 这个 Goroutine会优先保存在 A 所属的 P 的本地队列中

若本地队列已满

  • 若 G 加入 P 的本地队列时本地队列已满,那么 G 会被加入到全局 G 队列中。新 G 加入全局队列时,会把 P 本地队列中一半的 G 也同时移动到全局队列中(是乱序入队列),以保证 P 的本地队列可以继续加入新的 G

当 P 要执行 G 时

  • 会从P的本地队列查找G

  • 若本地队列中没有 G,则会尝试从其他的 P 中偷取 ( Steal ) G 来执行,通常会偷取一半的 G

  • 若无法从其他的P中偷取 G,则从全局 G 队列中获取 G,会一次获取多个 G

本地 G 队列 --> 其他 P 的本地 G 队列 --> 全局 G 队列

当全局运行队列中有待执行的 G 时,还会有固定几率(每61个调度时钟周期 schedtick)会从全局的运行队列中查找对应的 G,为了保证全局 G 队列一定可以被调度

5. P 与 M 关联的流程

P 中关联了大量待执行的 G,若需要执行 G,P 要去找可用的 M。P 不能执行 G,只有 M 才能真正执行 G

当 P 需要执行时候

  • P 要寻找可用的 M,优先从空闲 M池中找,若没有空闲的,则新建 M 来执行

  • 在创建 G 时,G 会尝试唤醒空闲的 M

当M的执行因为G进行了系统调用时

  • M 会释放与之绑定的 P,把 P 转移给其他的 M 去执行。称为 P 抢占

当M执行完的系统调用阻塞的G后

  • M 会尝试获取新的空闲 P,同时将 G 放入 P 的本地队列执行。若没有空闲的 P,则将 G 放入全局 G 队列,M 进入休眠,等待被唤醒或被垃圾回收

6. M0 和 G0

  • M0, 启动程序后的编号为 0 的主线程,负责执行初始化操作和启动第一个 G,也就是 main Goroutine。之后与其他M一样调度。

  • G0,每个 M 创建的第一个 Goroutine。G0 仅用于负责调度的 G,G0 不指向任何可执行的函数,每个 M 都会有一个自己的 G0。在调度或系统调用时会使用 G0 的栈空间。

7. 协作和抢占调度

  • 协作式,主动让出执行权,让其他G执行。通过runtime.Gosched()可以让出执行权。

  • 抢占式,被动让出执行权,也就是调度器将G的执行权取消,分配给其他的G。Go目前默认的方式。在Go中一个G最多可以执行10ms,超时就会被让出调度权。

默认的方式 , 在 Go 中一个 G 最多可以执行 10 ms , 超时就会被让出调度权限

// 该方法可以要求Go主动调度该goroutine,去执行其他的goroutine, 实现协作调度模式
runtime.Gosched()
func GoroutineSched() {

	wg := sync.WaitGroup{}
	wg.Add(2)

	//设置一个P在调度G
	runtime.GOMAXPROCS(1) //单线程模式

	// 输出奇数
	max := 100
	go func() {
		defer wg.Done()
		for i := 1; i < max; i += 2 {
			fmt.Print(i, " ")

			//主动让出
			runtime.Gosched()

			//增加执行时间
			//time.Sleep(1 * time.Millisecond)
		}
	}()

	//输出偶数
	go func() {
		defer wg.Done()
		for i := 2; i < max; i += 2 {
			fmt.Print(i, " ")

			// 主动让出
			runtime.Gosched()

			//增加执行时间
			//time.Sleep(1 * time.Millisecond)
		}
	}()

	wg.Wait()
}

若 goroutine 中,没有 runtime.GoSched ,则会先执行完一个,再执行另一个。若存在 runtime.GoSched,则会交替执行。这就是协作式

/

1

评论区