Go高级篇

并发

串行、并发与并行

串行:我们都是先读小学,小学毕业后再读初中,读完初中再读高中。

并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。

并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。

进程、线程和协程

进程(process):程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

线程(thread):操作系统基于进程开启的轻量级进程,是操作系统调度执行的最小单位。

协程(coroutine):非操作系统提供而是由用户自行创建和控制的用户态‘线程’,比线程更轻量级。

goroutine

启动 goroutine 只需要在调用函数(普通函数和匿名函数)前加上一个 go 关键字

func hello() {
	fmt.Println("hello")
}

func main() {
	go hello()
	fmt.Println("你好")
	time.Sleep(time.Second) // 表示延时1秒
}

输出:

在 Go 程序启动时,Go 程序就会为 main 函数创建一个默认的 goroutine。在上面的代码中我们在 main 函数中使用 go 关键字创建了另外一个 goroutine 去执行 hello 函数,在程序中创建 goroutine 执行函数需要一定的开销,而此时 main goroutine 还在继续往下执行,我们的程序中此时存在两个并发执行的 goroutine。当 main 函数结束时整个程序也就结束了,同时 main goroutine 也结束了,所有由 main goroutine 创建的 goroutine 也会一同退出。所有我们需要对 main goroutine 进行一定的延时,等待其他 goroutine 执行完毕后再退出。

在上面的程序中使用 time.Sleep 让 main goroutine 等待 hello goroutine 执行结束是不优雅的,当然也是不准确的。

当你并不关心并发操作的结果或者有其它方式收集并发操作的结果时,WaitGroup 是实现等待一组并发操作完成的好方法。

下面的示例代码中我们在 main goroutine 中使用 sync.WaitGroup 来等待 hello goroutine 完成后再退出。

sync.WaitGroup

在代码中生硬的使用 time.Sleep 肯定是不合适的,Go语言中可以使用 sync.WaitGroup 来实现并发任务的同步。 sync.WaitGroup 有以下几个方法:

方法名
功能

func (wg * WaitGroup) Add(delta int)

计数器+delta

(wg *WaitGroup) Done()

计数器-1

(wg *WaitGroup) Wait()

阻塞直到计数器变为0

sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。

需要注意sync.WaitGroup是一个结构体,进行参数传递的时候要传递指针。

将代码编译后再执行,得到的输出结果和之前一致,但是这一次程序不再会有多余的停顿,hello goroutine 执行完毕后程序直接退出。

启动多个goroutine

多次执行上面的代码会发现每次终端上打印数字的顺序都不一致。这是因为10个 goroutine 是并发执行的,而 goroutine 的调度是随机的。

动态栈

操作系统的线程一般都有固定的栈内存(通常为2MB),而 Go 语言中的 goroutine 非常轻量级,一个 goroutine 的初始栈空间很小(一般为2KB),所以在 Go 语言中一次创建数万个 goroutine 也是可能的。并且 goroutine 的栈不是固定的,可以根据需要动态地增大或缩小, Go 的 runtime 会自动为 goroutine 分配合适的栈空间。

goroutine调度

操作系统的线程会被操作系统内核调度时会挂起当前执行的线程并将它的寄存器内容保存到内存中,选出下一次要执行的线程并从内存中恢复该线程的寄存器信息,然后恢复执行该线程的现场并开始执行线程。从一个线程切换到另一个线程需要完整的上下文切换。因为可能需要多次内存访问,索引这个切换上下文的操作开销较大,会增加运行的cpu周期。

区别于操作系统内核调度操作系统线程,goroutine 的调度是Go语言运行时(runtime)层面的实现,是完全由 Go 语言本身实现的一套调度系统——go scheduler。它的作用是按照一定的规则将所有的 goroutine 调度到操作系统线程上执行。

在经历数个版本的迭代之后,目前 Go 语言的调度器采用的是 GPM 调度模型。

  • G:表示 goroutine,每执行一次 go f() 就创建一个 G,包含要执行的函数和上下文信息。

  • 全局队列(Global Queue):存放等待运行的 G。

  • P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。

  • P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。

  • M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。

  • Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的, goroutine 则是由Go运行时(runtime)自己的调度器调度的,完全是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身 goroutine 的超轻量级,以上种种特性保证了 goroutine 调度方面的性能。

GOMAXPROCS

Go运行时的调度器使用 GOMAXPROCS 参数来确定需要使用多少个 OS 线程来同时执行 Go 代码。默认值是机器上的 CPU 核心数。例如在一个 8 核心的机器上,GOMAXPROCS 默认为 8。Go语言中可以通过 runtime.GOMAXPROCS 函数设置当前程序并发时占用的 CPU逻辑核心数。(Go1.5版本之前,默认使用的是单核心执行。Go1.5 版本之后,默认使用全部的CPU 逻辑核心数。)

channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。为了保证数据交换的正确性,很多并发模型中必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言采用的并发模型是 CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

如果说 goroutine 是Go程序并发的执行体,channel 就是它们之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel类型

channel 是 Go 语言中一种特有的类型。声明通道类型变量的格式如下:

其中:

  • chan:是关键字

  • 元素类型:是指通道中传递元素的类型

举几个例子:

channel零值

未初始化的通道类型变量其默认零值是 nil

初始化channel

声明的通道类型变量需要使用内置的 make 函数初始化之后才能使用

其中:channel的缓冲大小是可选的

channel操作

通道共有发送(send)、接收(receive)和关闭(close)三种操作。而发送和接收操作都使用 <- 符号

发送:将一个值发送到通道中

接收:从一个通道中接收值

关闭:通过调用内置的 close 函数来关闭通道

注意:一个通道值是可以被垃圾回收掉的。通道通常由发送方执行关闭操作,并且只有在接收方明确等待通道关闭的信号时才需要执行关闭操作。它和关闭文件不一样,通常在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致 panic。

  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。

  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。

  4. 关闭一个已经关闭的通道会导致 panic。

无缓冲的通道

无缓冲的通道又称为阻塞的通道

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

deadlock 表示我们程序中的 goroutine 都被挂起导致程序死锁,这因为我们使用 ch := make(chan int) 创建的是无缓冲的通道,无缓冲的通道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。同理,如果对一个无缓冲通道执行接收操作时,没有任何向通道中发送值的操作那么也会导致接收操作阻塞。就像田径比赛中的4x100接力赛,想要完成交棒必须有一个能够接棒的运动员,否则只能等待。简单来说就是无缓冲的通道必须有至少一个接收方才能发送成功。

因为我们使用 ch := make(chan int) 创建的是无缓冲的通道,无缓冲的通道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。同理,如果对一个无缓冲通道执行接收操作时,没有任何向通道中发送值的操作那么也会导致接收操作阻塞。就像田径比赛中的4x100接力赛,想要完成交棒必须有一个能够接棒的运动员,否则只能等待。简单来说就是无缓冲的通道必须有至少一个接收方才能发送成功。

上面的代码会阻塞在 ch <- 10 这一行代码形成死锁,那如何解决这个问题呢?

其中一种可行的方法是创建一个 goroutine 去接收值,例如:

首先无缓冲通道 ch 上的发送操作会阻塞,直到另一个 goroutine 在该通道上执行接收操作,这时数字10才能发送成功,两个 goroutine 将继续执行。相反,如果接收操作先执行,接收方所在的 goroutine 将阻塞,直到 main goroutine 中向该通道发送数字10。

使用无缓冲通道进行通信将导致发送和接收的 goroutine 同步化。因此,无缓冲通道也被称为同步通道

有缓冲的通道

还有另外一种解决上面死锁问题的方法,那就是使用有缓冲区的通道。我们可以在使用 make 函数初始化通道时,可以为其指定通道的容量,例如:

只要通道的容量大于零,那么该通道就属于有缓冲的通道,通道的容量表示通道中最大能存放的元素数量。当通道内已有元素数达到最大容量后,再向通道执行发送操作就会阻塞,除非有从通道执行接收操作。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

我们可以使用内置的 len 函数获取通道内元素的数量,使用 cap 函数获取通道的容量,虽然我们很少会这么做。

多返回值模式

当向通道中发送完数据时,我们可以通过 close 函数来关闭通道。当一个通道被关闭后,再往该通道发送值会引发 panic,从该通道取值的操作会先取完通道中的值。通道内的值被接收完后再对通道执行接收操作得到的值会一直都是对应元素类型的零值。那我们如何判断一个通道是否被关闭了呢?

对一个通道执行接收操作时支持使用如下多返回值模式。

其中:

  • value:从通道中取出的值,如果通道被关闭则返回对应类型的零值。

  • ok:通道ch关闭时返回 false,否则返回 true。

下面代码片段中的 f2 函数会循环从通道 ch 中接收所有值,直到通道被关闭后退出。

for range接收值

通常我们会选择使用 for range 循环从通道中接收值,当通道被关闭后,会在通道内的所有值被接收完毕后会自动退出循环。上面那个示例我们使用 for range 改写后会很简洁。

**注意:**目前Go语言中并没有提供一个不对通道进行读取操作就能判断通道是否被关闭的方法。不能简单的通过 len(ch) 操作来判断通道是否被关闭。

单向通道

在某些场景下我们可能会将通道作为参数在多个任务函数间进行传递,通常我们会选择在不同的任务函数中对通道的使用进行限制,比如限制通道在某个函数中只能执行发送或只能执行接收操作。想象一下,我们现在有 ProducerConsumer 两个函数,其中 Producer 函数会返回一个通道,并且会持续将符合条件的数据发送至该通道,并在发送完成后将该通道关闭。而 Consumer 函数的任务是从通道中接收值进行计算,这两个函数之间通过 Processer 函数返回的通道进行通信。

从上面的示例代码中可以看出正常情况下 Consumer 函数中只会对通道进行接收操作,但是这不代表不可以在 Consumer 函数中对通道进行发送操作。作为 Producer 函数的提供者,我们在返回通道的时候可能只希望调用方拿到返回的通道后只能对其进行接收操作。但是我们没有办法阻止在 Consumer 函数中对通道进行发送操作。

Go语言中提供了单向通道来处理这种需要限制通道只能进行某种操作的情况。

其中,箭头 <- 和关键字 chan 的相对位置表明了当前通道允许的操作,这种限制将在编译阶段进行检测。另外对一个只接收通道执行close也是不允许的,因为默认通道的关闭操作应该由发送方来完成。

我们使用单向通道将上面的示例代码进行如下改造。

这一次,Producer 函数返回的是一个只接收通道,这就从代码层面限制了该函数返回的通道只能进行接收操作,保证了数据安全。很多读者看到这个示例可能会觉着这样的限制是多余的,但是试想一下如果 Producer 函数可以在其他地方被其他人调用,你该如何限制他人不对该通道执行发送操作呢?并且返回限制操作的单向通道也会让代码语义更清晰、更易读。

在函数传参及任何赋值操作中全向通道(正常通道)可以转换为单向通道,但是无法反向转换。

总结

下面的表格中总结了对不同状态下的通道执行相应操作的结果。

**注意:**对已经关闭的通道再执行 close 也会引发 panic。

select多路复用

在某些场景下我们可能需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以被接收那么当前 goroutine 将会发生阻塞。你也许会写出如下代码尝试使用遍历的方式来实现从多个通道中接收值。Go 语言内置了 select 关键字,使用它可以同时响应多个通道的操作。

Select 语句具有以下特点。

  • 可处理一个或多个 channel 的发送/接收操作。

  • 如果多个 case 同时满足,select 会随机选择一个执行。

  • 对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出。

下面的示例代码能够在终端打印出10以内的奇数,我们借助这个代码片段来看一下 select 的具体使用。

输出:

示例中的代码首先是创建了一个缓冲区大小为1的通道 ch,在进入 for 循环后,此时 i = 1,select 语句中包含两个 case 分支,此时由于通道中没有值可以接收,所以 x := <-c 这个 case 分支不满足,而 ch <- i 这个分支可以执行,会把1发送到通道中,结束本次 for 循环;第二次 for 循环时,i = 2,由于通道缓冲区已满,所以 ch <- i 这个分支不满足,而 x := <-ch 这个分支可以执行,从通道接收值1并赋值给变量 x ,所以会在终端打印出 1;后续的 for 循环同理会依次打印出3、5、7、9。

通道误用示例

示例1

将上述代码编译执行后,匿名函数所在的 goroutine 并不会按照预期在通道被关闭后退出。因为 task := <- ch 的接收操作在通道被关闭后会一直接收到零值,而不会退出。此处的接收操作应该使用 task, ok := <- ch,通过判断布尔值 ok 为假时退出;或者使用 select 来处理通道。

示例2

上述代码片段可能导致 goroutine 泄露(goroutine 并未按预期退出并销毁)。由于 select 命中了超时逻辑,导致通道没有消费者(无接收操作),而其定义的通道为无缓冲通道,因此 goroutine 中的 ch <- "job result" 操作会一直阻塞,最终导致 goroutine 泄露。

并发安全和锁

有时候我们的代码中可能会存在多个 goroutine 同时操作一个资源(临界区)的情况,这种情况下就会发生 竞态问题(数据竞态)。这就好比现实生活中十字路口被各个方向的汽车竞争,还有火车上的卫生间被车厢里的人竞争。

我们将上面的代码编译后执行,不出意外每次执行都会输出诸如9537、5865、6527等不同的结果。这是为什么呢?

在上面的示例代码片中,我们开启了两个 goroutine 分别执行 add 函数,这两个 goroutine 在访问和修改全局的 x 变量时就会存在数据竞争,某个 goroutine 中对全局变量 x 的修改可能会覆盖掉另一个 goroutine 中的操作,所以导致最后的结果与预期不符。

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。Go 语言中使用 sync 包中提供的 Mutex 类型来实现互斥锁。

sync.Mutex 提供了两个方法供我们使用。

方法名
功能

func (m *Mutex) Lock()

获取互斥锁

func (m *Mutex) Unlock()

释放互斥锁

我们在下面的示例代码中使用互斥锁限制每次只有一个 gorouti

将上面的代码编译后多次执行,每一次都会得到预期中的结果——10000。

使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁

互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用 sync 包中的 RWMutex 类型。

sync.RWMutex 提供了以下5个方法。

方法名
功能

func (rw *RWMutex) Lock()

获取写锁

func (rw *RWMutex) Unlock()

释放写锁

func (rw *RWMutex) RLock()

获取读锁

func (rw *RWMutex) RUnlock()

释放读锁

func (rw *RWMutex) RLocker() Locker

返回一个实现Locker接口的读写锁

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。

下面我们使用代码构造一个读多写少的场景,然后分别使用互斥锁和读写锁查看它们的性能差异。

从最终的执行结果可以看出,使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能。不过需要注意的是如果一个程序中的读操作和写操作数量级差别不大,那么读写互斥锁的优势就发挥不出来。

sync.Once

在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次,例如只加载一次配置文件等。

Go语言中的 sync 包中提供了一个针对只执行一次场景的解决方案—— sync.Oncesync.Once 只有一个 Do 方法,其签名如下:

**注意:**如果要执行的函数 f 需要传递参数就需要搭配闭包来使用。

加载配置文件示例

延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:

多个 goroutine 并发调用 Icon 函数时不是并发安全的,现代的编译器和CPU可能会在保证每个 goroutine 都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

在这种情况下就会出现即使判断了 icons 不是 nil 也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的 goroutine 操作,但是这样做又会引发性能问题。

使用 sync.Once 改造的示例代码如下:

并发安全的单例模式

下面是借助 sync.Once 实现的并发安全的单例模式:

sync.Once 其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

Go 语言中内置的 map 不是并发安全的,请看下面这段示例代码。

将上面的代码编译后执行,会报出 fatal error: concurrent map writes 错误。我们不能在多个 goroutine 中并发对内置的 map 进行读写操作,否则会存在数据竞争问题。

像这种场景下就需要为 map 加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版 map—— sync.Map。开箱即用表示其不用像内置的 map 一样使用 make 函数初始化就能直接使用。同时 sync.Map 内置了诸如 StoreLoadLoadOrStoreDeleteRange 等操作方法。

方法名
功能

func (m *Map) Store(key, value interface{})

存储key-value数据

func (m *Map) Load(key interface{}) (value interface{}, ok bool)

查询key对应的value

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)

查询或存储key对应的value

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool)

查询并删除key

func (m *Map) Delete(key interface{})

删除key

func (m *Map) Range(f func(key, value interface{}) bool)

对map中的每个key-value依次调用f

下面的代码示例演示了并发读写 sync.Map

原子操作

针对整数数据类型(int32、uint32、int64、uint64)我们还可以使用原子操作来保证并发安全,通常直接使用原子操作比使用锁操作效率更高。Go语言中原子操作由内置的标准库sync/atomic提供。

atomic包

方法
解释

func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

读取操作

func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

写入操作

func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

修改操作

func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

交换操作

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

比较并交换操作

示例

我们填写一个示例来比较下互斥锁和原子操作的性能。

atomic 包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者 sync 包的函数/类型实现同步更好。

处理并发错误

recover goroutine 中的 panic

我们知道可以在代码中使用 recover 来会恢复程序中意想不到的 panic,而 panic 只会触发当前 goroutine 中的 defer 操作。

例如在下面的示例代码中,无法在 main 函数中 recover 另一个 goroutine 中引发的 panic。

输出:

从输出结果可以看到程序并没有正常退出,而是由于 panic 异常退出了(exit code 2)。

正如上面示例演示的那样,在启用 goroutine 去执行任务的场景下,如果想要 recover goroutine 中可能出现的 panic 就需要在 goroutine 中使用 recover。就像下面的 f2 函数那样。

输出:

程序中的 panic 被 recover 成功捕获,程序最终正常退出。

errgroup

在以往演示的并发示例中,我们通常像下面的示例代码那样在 go 关键字后,调用一个函数或匿名函数。

在之前讲解并发的代码示例中我们默认被并发的那些函数都不会返回错误,但真实的情况往往是事与愿违。

当我们想要将一个任务拆分成多个子任务交给多个 goroutine 去运行,这时我们该如何获取到子任务可能返回的错误呢?

假设我们有多个网址需要并发去获取它们的内容,这时候我们会写出类似下面的代码。

执行上述fetchUrlDemo函数得到如下输出结果,由于 http://www.yixieqitawangzhi.com 是我随意编造的一个并不真实存在的 url,所以对它的 HTTP 请求会返回错误。

在上面的示例代码中,我们开启了 3 个 goroutine 分别去获取3个 url 的内容。类似这种将任务分为若干个子任务的场景会有很多,那么我们如何获取子任务中可能出现的错误呢?

errgroup 包就是为了解决这类问题而开发的,它能为处理公共任务的子任务而开启的一组 goroutine 提供同步、error 传播和基于context 的取消功能。

errgroup 包中定义了一个 Group 类型,它包含了若干个不可导出的字段。

errgroup.Group 提供了GoWait两个方法。

  • Go 函数会在新的 goroutine 中调用传入的函数f。

  • 第一个返回非零错误的调用将取消该Group;下面的Wait方法会返回该错误

  • Wait 会阻塞直至由上述 Go 方法调用的所有函数都返回,然后从它们返回第一个非nil的错误(如果有)。

下面的示例代码演示了如何使用 errgroup 包来处理多个子任务 goroutine 中可能返回的 error。

执行上面的fetchUrlDemo2函数会得到如下输出结果。

当子任务的 goroutine 中对http://www.yixieqitawangzhi.com 发起 HTTP 请求时会返回一个错误,这个错误会由 errgroup.Group 的 Wait 方法返回。

通过阅读下方 errgroup.Group 的 Go 方法源码,我们可以看到当任意一个函数 f 返回错误时,会通过g.errOnce.Do只将第一个返回的错误记录,并且如果存在 cancel 方法则会调用cancel。

那么如何创建带有 cancel 方法的 errgroup.Group 呢?

答案是通过 errorgroup 包提供的 WithContext 函数。

WithContext 函数接收一个父 context,返回一个新的 Group 对象和一个关联的子 context 对象。下面的代码片段是一个官方文档给出的示例。

或者这里有另外一个示例。

网络编程

互联网协议介绍

互联网的核心是一系列协议,总称为”互联网协议”(Internet Protocol Suite),正是这一些协议规定了电脑如何连接和组网。我们理解了这些协议,就理解了互联网的原理。由于这些协议太过庞大和复杂,没有办法在这里一概而全,只能介绍一下我们日常开发中接触较多的几个协议。

互联网分层模型

如上图所示,互联网按照不同的模型划分会有不用的分层,但是不论按照什么模型去划分,越往上的层越靠近用户,越往下的层越靠近硬件。在软件开发中我们使用最多的是上图中将互联网划分为五个分层的模型。

接下来我们一层一层的自底向上介绍一下每一层。

物理层

我们的电脑要与外界互联网通信,需要先把电脑连接网络,我们可以用双绞线、光纤、无线电波等方式。这就叫做”实物理层”,它就是把电脑连接起来的物理手段。它主要规定了网络的一些电气特性,作用是负责传送0和1的电信号。

数据链路层

单纯的0和1没有任何意义,所以我们使用者会为其赋予一些特定的含义,规定解读电信号的方式:例如:多少个电信号算一组?每个信号位有何意义?这就是”数据链接层”的功能,它在”物理层”的上方,确定了物理层传输的0和1的分组方式及代表的意义。早期的时候,每家公司都有自己的电信号分组方式。逐渐地,一种叫做”以太网”(Ethernet)的协议,占据了主导地位。

以太网规定,一组电信号构成一个数据包,叫做”帧”(Frame)。每一帧分成两个部分:标头(Head)和数据(Data)。其中”标头”包含数据包的一些说明项,比如发送者、接受者、数据类型等等;”数据”则是数据包的具体内容。”标头”的长度,固定为18字节。”数据”的长度,最短为46字节,最长为1500字节。因此,整个”帧”最短为64字节,最长为1518字节。如果数据很长,就必须分割成多个帧进行发送。

那么,发送者和接受者是如何标识呢?以太网规定,连入网络的所有设备都必须具有”网卡”接口。数据包必须是从一块网卡,传送到另一块网卡。网卡的地址,就是数据包的发送地址和接收地址,这叫做MAC地址。每块网卡出厂的时候,都有一个全世界独一无二的MAC地址,长度是48个二进制位,通常用12个十六进制数表示。前6个十六进制数是厂商编号,后6个是该厂商的网卡流水号。有了MAC地址,就可以定位网卡和数据包的路径了。

我们会通过ARP协议来获取接受方的MAC地址,有了MAC地址之后,如何把数据准确的发送给接收方呢?其实这里以太网采用了一种很”原始”的方式,它不是把数据包准确送到接收方,而是向本网络内所有计算机都发送,让每台计算机读取这个包的”标头”,找到接收方的MAC地址,然后与自身的MAC地址相比较,如果两者相同,就接受这个包,做进一步处理,否则就丢弃这个包。这种发送方式就叫做”广播”(broadcasting)。

网络层

按照以太网协议的规则我们可以依靠MAC地址来向外发送数据。理论上依靠MAC地址,你电脑的网卡就可以找到身在世界另一个角落的某台电脑的网卡了,但是这种做法有一个重大缺陷就是以太网采用广播方式发送数据包,所有成员人手一”包”,不仅效率低,而且发送的数据只能局限在发送者所在的子网络。也就是说如果两台计算机不在同一个子网络,广播是传不过去的。这种设计是合理且必要的,因为如果互联网上每一台计算机都会收到互联网上收发的所有数据包,那是不现实的。

因此,必须找到一种方法区分哪些MAC地址属于同一个子网络,哪些不是。如果是同一个子网络,就采用广播方式发送,否则就采用”路由”方式发送。这就导致了”网络层”的诞生。它的作用是引进一套新的地址,使得我们能够区分不同的计算机是否属于同一个子网络。这套地址就叫做”网络地址”,简称”网址”。

“网络层”出现以后,每台计算机有了两种地址,一种是MAC地址,另一种是网络地址。两种地址之间没有任何联系,MAC地址是绑定在网卡上的,网络地址则是网络管理员分配的。网络地址帮助我们确定计算机所在的子网络,MAC地址则将数据包送到该子网络中的目标网卡。因此,从逻辑上可以推断,必定是先处理网络地址,然后再处理MAC地址。

规定网络地址的协议,叫做IP协议。它所定义的地址,就被称为IP地址。目前,广泛采用的是IP协议第四版,简称IPv4。IPv4这个版本规定,网络地址由32个二进制位组成,我们通常习惯用分成四段的十进制数表示IP地址,从0.0.0.0一直到255.255.255.255。

根据IP协议发送的数据,就叫做IP数据包。IP数据包也分为”标头”和”数据”两个部分:”标头”部分主要包括版本、长度、IP地址等信息,”数据”部分则是IP数据包的具体内容。IP数据包的”标头”部分的长度为20到60字节,整个数据包的总长度最大为65535字节。

传输层

有了MAC地址和IP地址,我们已经可以在互联网上任意两台主机上建立通信。但问题是同一台主机上会有许多程序都需要用网络收发数据,比如QQ和浏览器这两个程序都需要连接互联网并收发数据,我们如何区分某个数据包到底是归哪个程序的呢?也就是说,我们还需要一个参数,表示这个数据包到底供哪个程序(进程)使用。这个参数就叫做”端口”(port),它其实是每一个使用网卡的程序的编号。每个数据包都发到主机的特定端口,所以不同的程序就能取到自己所需要的数据。

“端口”是0到65535之间的一个整数,正好16个二进制位。0到1023的端口被系统占用,用户只能选用大于1023的端口。有了IP和端口我们就能实现唯一确定互联网上一个程序,进而实现网络间的程序通信。

我们必须在数据包中加入端口信息,这就需要新的协议。最简单的实现叫做UDP协议,它的格式几乎就是在数据前面,加上端口号。UDP数据包,也是由”标头”和”数据”两部分组成:”标头”部分主要定义了发出端口和接收端口,”数据”部分就是具体的内容。UDP数据包非常简单,”标头”部分一共只有8个字节,总长度不超过65,535字节,正好放进一个IP数据包。

UDP协议的优点是比较简单,容易实现,但是缺点是可靠性较差,一旦数据包发出,无法知道对方是否收到。为了解决这个问题,提高网络可靠性,TCP协议就诞生了。TCP协议能够确保数据不会遗失。它的缺点是过程复杂、实现困难、消耗较多的资源。TCP数据包没有长度限制,理论上可以无限长,但是为了保证网络的效率,通常TCP数据包的长度不会超过IP数据包的长度,以确保单个TCP数据包不必再分割。

应用层

应用程序收到”传输层”的数据,接下来就要对数据进行解包。由于互联网是开放架构,数据来源五花八门,必须事先规定好通信的数据格式,否则接收方根本无法获得真正发送的数据内容。”应用层”的作用就是规定应用程序使用的数据格式,例如我们TCP协议之上常见的Email、HTTP、FTP等协议,这些协议就组成了互联网协议的应用层。

如下图所示,发送方的HTTP数据经过互联网的传输过程中会依次添加各层协议的标头信息,接收方收到数据包之后再依次根据协议解包得到数据。

socket编程

Socket是BSD UNIX的进程通信机制,通常也称作”套接字”,用于描述IP地址和端口,是一个通信链的句柄。Socket可以理解为TCP/IP网络的API,它定义了许多函数或例程,程序员可以用它们来开发TCP/IP网络上的应用程序。电脑上运行的应用程序通常通过”套接字”向网络发出请求或者应答网络请求。

socket图解

Socket 是应用层与TCP/IP协议族通信的中间软件抽象层。在设计模式中,Socket 其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在 Socket 后面,对用户来说只需要调用Socket规定的相关函数,让 Socket 去组织符合指定的协议数据然后进行通信。

Go语言实现TCP通信

TCP协议

TCP/IP(Transmission Control Protocol/Internet Protocol) 即传输控制协议/网间协议,是一种面向连接(连接导向)的、可靠的、基于字节流的传输层(Transport layer)通信协议,因为是面向连接的协议,数据像水流一样传输,会存在黏包问题。

TCP服务端

一个TCP服务端可以同时连接很多个客户端,例如世界各地的用户使用自己电脑上的浏览器访问淘宝网。因为Go语言中创建多个 goroutine 实现并发非常方便和高效,所以我们可以每建立一次链接就创建一个 goroutine 去处理。

TCP服务端程序的处理流程:

  1. 监听端口

  2. 接收客户端请求建立链接

  3. 创建 goroutine 处理链接。

我们使用 Go 语言的 net 包实现的 TCP 服务端代码如下:

将上面的代码保存之后编译成 serverserver.exe 可执行文件。

TCP客户端

一个TCP客户端进行TCP通信的流程如下:

  1. 建立与服务端的链接

  2. 进行数据收发

  3. 关闭链接

使用Go语言的net包实现的TCP客户端代码如下:

将上面的代码编译成 clientclient.exe 可执行文件,先启动server端再启动client端,在client端输入任意内容回车之后就能够在server端看到client端发送的数据,从而实现TCP通信。

TCP黏包

示例

服务端代码如下:

客户端代码如下:

将上面的代码保存后,分别编译。先启动服务端再启动客户端,可以看到服务端输出结果如下:

客户端分10次发送的数据,在服务端并没有成功的输出10次,而是多条数据“粘”到了一起。

为什么会出现粘包

主要原因就是tcp数据传递模式是流模式,在保持长连接的时候可以进行多次的收和发。

“粘包”可发生在发送端也可发生在接收端:

  1. 由Nagle算法造成的发送端的粘包:Nagle算法是一种改善网络传输效率的算法。简单来说就是当我们提交一段数据给TCP发送时,TCP并不立刻发送此段数据,而是等待一小段时间看看在等待期间是否还有要发送的数据,若有则会一次把这两段数据发送出去。

  2. 接收端接收不及时造成的接收端粘包:TCP会把接收到的数据存在自己的缓冲区中,然后通知应用层取数据。当应用层由于某些原因不能及时的把TCP的数据取出来,就会造成TCP缓冲区中存放了几段数据。

解决办法

出现”粘包”的关键在于接收方不确定将要传输的数据包的大小,因此我们可以对数据包进行封包和拆包的操作。

封包:封包就是给一段数据加上包头,这样一来数据包就分为包头和包体两部分内容了(过滤非法包时封包会加入”包尾”内容)。包头部分的长度是固定的,并且它存储了包体的长度,根据包头长度固定以及包头中含有包体长度的变量就能正确的拆分出一个完整的数据包。

我们可以自己定义一个协议,比如数据包的前4个字节为包头,里面存储的是发送的数据的长度。

接下来在服务端和客户端分别使用上面定义的proto包的DecodeEncode函数处理数据。

服务端代码如下:

客户端代码如下:

Go语言实现UDP通信

UDP协议

UDP协议(User Datagram Protocol)中文名称是用户数据报协议,是OSI(Open System Interconnection,开放式系统互联)参考模型中一种无连接的传输层协议,不需要建立连接就能直接进行数据发送和接收,属于不可靠的、没有时序的通信,但是UDP协议的实时性比较好,通常用于视频直播相关领域。

UDP服务端

使用Go语言的net包实现的UDP服务端代码如下:

UDP客户端

使用Go语言的net包实现的UDP客户端代码如下:

Last updated