目 录CONTENT

文章目录

Go协程与管道

Sakura
2023-10-07 / 0 评论 / 0 点赞 / 1 阅读 / 19047 字 / 正在检测是否收录...

1. 概念

  1. 程序(program):是完成特定任务、用某种语言编写的一组指令的集合,是一段静态的代码(程序是静态的)

  2. 进程(process):是程序的一次执行过程。正在运行的一个程序,进程作为资源分配的单位,在内存中会为每个进程分配不同的内存区域。是一个动的过程,有它自身的产生、存在、消亡的过程。(进程是动态的)

  3. 线程(thread):进程可以进一步的细化为线程,是一个程序内部的一条执行线路。若一个进程同一时间并行执行多个线程,就是支持多线程的。

  4. 协程(goroutine):又称为微线程,纤程,协程是一种用户态的轻量级线程。

2. 协程

协程的作用:在执行A函数的时候,可以随时中断,去执行B函数,然后中断继续执行A函数(可以自动切换)。这一切换过程没有调用语句,过程很像多线程,然后协程只有一个线程在执行。协程的本质就是一个单线程。

对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就将寄存器上下文和栈保存到某个其他地方,然后切换到另外一个任务去计算。在任务切回来的时候,恢复先前保存的寄存器上下文和栈,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而会更多的将cpu的执行权限分配给我们的线程(注意:线程是CPU控制的,而协程是程序自身控制的,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级)

2.1. 协程入手

package main

import (
	"fmt"
	"strconv"
	"time"
)

func test() {
	for i := 1; i <= 10; i++ {
		fmt.Println("hello,golang + " + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}

func main() {
	//主线程
	test()

	for i := 1; i <= 10; i++ {
		fmt.Println("hello,golang + " + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}

/*
hello,golang + 1
hello,golang + 2
hello,golang + 3
hello,golang + 4
hello,golang + 5
hello,golang + 6
hello,golang + 7
hello,golang + 8
hello,golang + 9
hello,golang + 10
hello,golang + 1
hello,golang + 2
hello,golang + 3
hello,golang + 4
hello,golang + 5
hello,golang + 6
hello,golang + 7
hello,golang + 8
hello,golang + 9
hello,golang + 10
*/

从这个代码可以看出,未开启协程,运行并非是并行。


接下来就需要开启协程用go关键字就可以开启协程

package main

import (
	"fmt"
	"strconv"
	"time"
)

func test() {
	for i := 1; i <= 10; i++ {
		fmt.Println("hello,golang + " + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}

func main() {
	//主线程
	go test() //开启协程

	for i := 1; i <= 10; i++ {
		fmt.Println("hello,golang + " + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}

/*
hello,golang + 1
hello,golang + 1
hello,golang + 2
hello,golang + 2
hello,golang + 3
hello,golang + 3
hello,golang + 4
hello,golang + 4
hello,golang + 5
hello,golang + 5
hello,golang + 6
hello,golang + 6
hello,golang + 7
hello,golang + 7
hello,golang + 8
hello,golang + 8
hello,golang + 9
hello,golang + 9
hello,golang + 10
hello,golang + 10
*/

这里有一个原则:主死从随

当主线程结束的时候,跟随的协程也会随即死掉

2.2. 协程控制

package main

import (
    "fmt"
    "strconv"
    "time"
)

func test() {
    for i := 1; i <= 10; i++ {
        fmt.Println("hello,golang2 + " + strconv.Itoa(i))
        time.Sleep(time.Second)
    }
}

func main() {
    //主线程
    go test() //开启协程

    for i := 1; i <= 5; i++ {
        fmt.Println("hello,golang1 + " + strconv.Itoa(i))
        time.Sleep(time.Second)
    }
}

/*
hello,golang1 + 1
hello,golang2 + 1
hello,golang2 + 2
hello,golang1 + 2
hello,golang2 + 3
hello,golang1 + 3
hello,golang1 + 4
hello,golang2 + 4
hello,golang2 + 5
hello,golang1 + 5
*/

从这里就可以看出,如果主线程退出了,即使协程并没有运行完成,也会退出。当然协程也可以在主线程没有退出之前就自己结束了,比如完成了自己的任务。


package main

import (
    "fmt"
    "time"
)

func main() {
    //匿名函数 + 外部变量 = 闭包 
    for i := 1; i <= 5; i++ {
        //使用匿名函数,直接调用匿名函数
        go func() {
            fmt.Println(i)
        }()
    }

    time.Sleep(time.Second)
}

/*
PS E:\CODE\Go_use\src\test1\main> go run main.go
6
6
6
6
6
PS E:\CODE\Go_use\src\test1\main> go run main.go
6
6
3
4
6
PS E:\CODE\Go_use\src\test1\main> go run main.go
6
4
6
3
6

*/

可以看出,形成闭包之后,协程输出的数很随机,并没有想象中的有序。这里就需要将值传入进来

package main

import (
    "fmt"
    "time"
)

func main() {
    //匿名函数 + 外部变量 = 闭包
    for i := 1; i <= 5; i++ {
        //使用匿名函数,直接调用匿名函数
        go func(n int) {
            fmt.Println(n)
        }(i)
    }

    time.Sleep(time.Second)
}

/*
PS E:\CODE\Go_use\src\test1\main> go run main.go
5
2
4
1
3
PS E:\CODE\Go_use\src\test1\main> go run main.go
5
1
4
3
2
PS E:\CODE\Go_use\src\test1\main> go run main.go
5
2
4
1
3
*/

当使用传入值时,虽然顺序还是会乱,但是值起码都存在且正常。

2.3. 使用WaitGroup控制协程退出

func main() {
    //匿名函数 + 外部变量 = 闭包
    for i := 1; i <= 5; i++ {
        //使用匿名函数,直接调用匿名函数
        go func(n int) {
            fmt.Println(n)
        }(i)
    }

    time.Sleep(time.Second)
}

在这里写time.Sleep(time.Second)是为了控制主线程不要退出,防止协程随着暴死。但是具体需要控制多少时间,我们也不清楚。这里就引入了WaitGroup函数了

该函数用于等待一组线程的结束。父线程1调用Add方法来设定应该等待的线程的数量。每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。--->解决主线程在子协程结束后自动结束

主要函数有:

func (*WaitGroup) Add(delta int)

func (*WaitGroup) Done()

func (*WaitGroup) Wait()

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func main() {
    //启动五个协程
    for i := 1; i <= 5; i++ {
        wg.Add(1) //协程开始的时候执行加一
        go func(n int) {
            fmt.Println(n)
            wg.Done() //协程执行完成减一
        }(i)
    }
    //主线程一直在阻塞,什么时候wg减到零了,就停止。
    wg.Wait()
}

/*
5
1
4
2
3
*/

2.4. 多个协程操作同一个数据案例

package main

import (
    "fmt"
    "sync"
)

var total int
var wg sync.WaitGroup

func add() {
    defer wg.Done()
    for i := 0; i < 100; i++ {
        total += 1
    }
}

func main() {
    wg.Add(1)
    //启动协程

    go add()
    wg.Wait()
    fmt.Println(total)//100
}

package main

import (
    "fmt"
    "sync"
)

var total int
var wg sync.WaitGroup

func add() {
    defer wg.Done()
    for i := 0; i < 100; i++ {
        total += 1
    }
}

func sub() {
    defer wg.Done()
    for i := 0; i < 100; i++ {
        total -= 1
    }
}

func main() {
    wg.Add(2)
    //启动协程

    go add()
    go sub()
    wg.Wait()
    fmt.Println(total) //0
}

理论上这个结果应当是零,但实际运行时,有些时候并不是零。

问题出现的原因有很多种可能,协程运行混乱、争抢资源是最大的可能性之一。

所以我们需要上互斥锁,防止混乱。

2.5. sync包的使用

golang中的sync包实现了两种锁 Mutex(互斥锁)和RWMutex(读写锁)

  1. 互斥锁

Lock()加锁,Unlock()解锁。使用Lock()后便不能对其再次加锁,直到使用Unlock()进行解锁,才能再次加锁。适用于读写不确定的场景,即读写次数没有明显区别。——性能和效率偏低。

  1. 读写锁

经常使用于读次数多于写次数的场景——在读的时候,数据之间不产生影响,写和读之间才产生影响。

import "sync"

sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些。

本包的类型的值不应被拷贝。

Index

2.5.1. 使用互斥锁同步协程

确保一个协程在执行逻辑的同时另一个协程不执行

package main

import (
    "fmt"
    "sync"
)

var total int
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    defer wg.Done()
    for i := 0; i < 100; i++ {
        //加锁
        lock.Lock()
        total += 1
        //解锁
        lock.Unlock()
    }
}

func sub() {
    defer wg.Done()
    for i := 0; i < 100; i++ {
        //加锁
        lock.Lock()
        total -= 1
        //解锁
        lock.Unlock()
    }
}

func main() {
    wg.Add(2)
    //启动协程

    go add()
    go sub()
    wg.Wait()
    fmt.Println(total) //0
}

此时出现的结果就一定是0

2.5.2. 读写锁的使用

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup
var lock sync.RWMutex

func Read() {
    defer wg.Done()
    lock.RLock() //加入读写锁,如果只是读数据,这个锁不产生影响,但是读写同时发生的时候,就会有影响
    fmt.Println("开始读取数据")
    time.Sleep(time.Second)
    fmt.Println("读取数据成功")
    lock.RUnlock()
}

func Write() {
    defer wg.Done()
    lock.RLock() //加入读写锁,如果只是读数据,这个锁不产生影响,但是读写同时发生的时候,就会有影响
    fmt.Println("开始修改数据")
    time.Sleep(time.Second)
    fmt.Println("修改数据成功")
    lock.RUnlock()
}

func main() {
    wg.Add(6)
    //启动协程
    for i := 0; i < 6; i++ {
        go Read()
    }

    go Write()
    wg.Wait()
}

/*
开始读取数据
开始读取数据
开始读取数据
开始修改数据==
开始读取数据
开始读取数据
开始读取数据
读取数据成功--
修改数据成功==
*/

可以看出使用读写锁之后,读协程可以并发执行,写协程不可以,只能等待读协程全部完成。

3. 管道(channel)

管道的本质就是队列,先进先出(FIFO)

自身就是线程安全的,多协程访问的时候,不需要加锁,本身就是线程安全。

不过管道也是有类型的,一个string的管道就只能存放string类型的数据

3.1. 管道的定义

var 变量名 chan 数据类型

数据类型是指里面放入的数据的类型,管道是由类型的

管道是引用类型,必须初始化才能写入数据,即make

package main

import "fmt"

func main() {
	//定义管道
	var a chan int
	//通过make初始化
	a = make(chan int, 3)

	fmt.Printf("a的值: %v\n", a) //a的值: 0xc0000b6100

	//向管道存入数据
	a <- 10
	num := 20
	a <- num

	fmt.Printf("管道的实际长度:%v,管道的容量:%v\n", len(a), cap(a))
	//管道的实际长度:2,管道的容量:3
	fmt.Println(a)
	//0xc0000b6100

	num1 := <-a
	fmt.Println(num1) //10
	fmt.Printf("管道的实际长度:%v,管道的容量:%v\n", len(a), cap(a))
	//管道的实际长度:1,管道的容量:3
}

注意:管道不能存放大于容量的数据,取得数据大于长度,再取也会报错。

3.2. 管道的关闭

package main

import "fmt"

func main() {
    //定义管道
    var a chan int
    a = make(chan int, 3)

    a <- 110
    a <- 20
    close(a)

    num := <-a
    fmt.Println(num)//110

    a <- 10 //panic: send on closed channel

}

根据这个例子可以看出,当管道关闭之后,是指入口关闭,不能写,但是依旧可以读。

3.3. 管道的遍历

管道支持for-range的方式进行遍历

  1. 在遍历时,如果管道没有关闭,则会出现deadlock的错误

  2. 在遍历时,如果管道已经关闭,则正常遍历数据,遍历完后就会退出遍历

package main

import "fmt"

func main() {
    //定义管道
    var a chan int
    a = make(chan int, 10)

    for i := 0; i < 10; i++ {
        a <- i
    }
    close(a)
    //在遍历前如果没有关闭管道就会报错
    for v := range a {
        fmt.Println("value=", v)
    }

}
/*
value= 0
value= 1
value= 2
value= 3
value= 4
value= 5
value= 6
value= 7
value= 8
value= 9
*/

3.4. 声明只读只写管道

package main

import "fmt"

func main() {
    //声明只写:
    var a1 chan<- int
    a1 = make(chan int, 3)
    a1 <- 20
    fmt.Println("a1:", a1) //a1: 0xc00007c100
    //声明只读:
    var a2 <-chan int
    fmt.Println("a2", a2) //a2 <nil>
}

3.5. 管道的阻塞

  1. 当管道只写入数据,没有读取,就会出现阻塞

  2. 写的快,读的慢。读写频率不一致时,也不会出现阻塞

4. 协程与管道共同工作

案例需求:

  1. 开启一个writeData协程,向管道中写入50个整数

  2. 开启一个readData协程,从管道中读取writeData写入的数据

  3. 注意:writeData和readData操作的是同一个管道

  4. 主线程需要等待writeData和readData协程都完成工作才能退出

package main

import "fmt"

func writeData(ma chan int) {
    for i := 1; i <= 10; i++ {
        ma <- i
        fmt.Println("写入的数据为:", i)
    }
}

func readData(ma chan int) {
    for i2 := range ma {
        fmt.Println("读出的数据为:", i2)
    }
}

func main() {
    ma := make(chan int, 10)
    writeData(ma)
    close(ma)
    readData(ma)
}
/*
写入的数据为: 1
写入的数据为: 2
写入的数据为: 3
写入的数据为: 4
写入的数据为: 5
写入的数据为: 6
写入的数据为: 7
写入的数据为: 8
写入的数据为: 9
写入的数据为: 10
读出的数据为: 1
读出的数据为: 2
读出的数据为: 3
读出的数据为: 4
读出的数据为: 5
读出的数据为: 6
读出的数据为: 7
读出的数据为: 8
读出的数据为: 9
读出的数据为: 10
*/

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func writeData(ma chan int) {
    defer wg.Done()
    for i := 1; i <= 10; i++ {
        ma <- i
        fmt.Println("写入的数据为:", i)
        time.Sleep(time.Second)
    }
    close(ma)
}

func readData(ma chan int) {
    defer wg.Done()
    for i2 := range ma {
        fmt.Println("读出的数据为:", i2)
    }
}

func main() {
    wg.Add(2)
    ma := make(chan int, 10)
    go writeData(ma)
    go readData(ma)
    wg.Wait()
}

/*
写入的数据为: 1
读出的数据为: 1
写入的数据为: 2
读出的数据为: 2
写入的数据为: 3
读出的数据为: 3
写入的数据为: 4
读出的数据为: 4
写入的数据为: 5
读出的数据为: 5
写入的数据为: 6
读出的数据为: 6
写入的数据为: 7
读出的数据为: 7
写入的数据为: 8
读出的数据为: 8
写入的数据为: 9
读出的数据为: 9
写入的数据为: 10
读出的数据为: 10
*/

但是不在writeData中添加time.Sleep(time.Second)就不会进行并行运算。

5. select功能

select解决多个管道的选择问题,也可以叫多路复用,可以从多个管道中随机公平地选择一个来执行

不过case之后必须进行io操作,不能是等值,随机去选择一个io操作

default防止select被阻塞住,加入default

package main

import (
    "fmt"
    "time"
)

func main() {
    //定义一个int类型管道
    intChan := make(chan int, 1)
    go func() {
        time.Sleep(time.Second * 2)
        intChan <- 10
    }()
    //定义一个string类型管道
    stringChan := make(chan string, 1)
    go func() {
        time.Sleep(time.Second * 1)
        stringChan <- "leijianx"
    }()

    select {
        case v := <-intChan:
        fmt.Println("intChan:", v)
        case v := <-stringChan:
        fmt.Println("stringChan", v)
        default:
        fmt.Println("防止select被阻塞")
    }
    //stringChan leijianx
}

6. defer + recover

  1. 多个协程工作,其中一个协程出现panic,导致程序崩溃

  2. 利用defer+recover捕获panic进行处理,即使协程出现问题,主线程仍然不受影响可以继续执行

package main

import (
    "fmt"
    "time"
)

func printNum() {
    for i := 1; i < 11; i++ {
        fmt.Println(i)
    }
}

func devide() {
    defer func() {
        err := recover()
        if err != nil {
            fmt.Println("devide()出现错误:", err)
        }
    }()
    num1 := 10
    num2 := 0
    result := num1 / num2
    fmt.Println(result)
    //此时被除数为0,不合法,于是出现panic。就可以用defer进行处理
}

func main() {
    //启动两个线程
    go printNum()
    go devide()

    time.Sleep(time.Second * 5)
}

/*
1
2
3
4
5
6
7
8
9
10
devide()出现错误: runtime error: integer divide by zero
*/
0

评论区