并发
- 作为语言的核心部分,
Go
提供了并发的特性。 - 这一部分概览了
goroutine
和channel
,以及如何使用它们来实现不同的并发模式。 Go
将并发结构作为核心语言的一部分提供。本节课程通过一些示例介绍并展示了它们的用法。
Go 程
- Go 程(
goroutine
)_ 是由Go
运行时管理的轻量级线程。
go f(x, y, z)
- 会启动一个新的
Go
程并执行
f(x, y, z)
- f 、 x 、 y 和 z 的求值发生在当前的
Go
程中,而f
的执行发生在新的Go
程中。 Go
程在相同的地址空间中运行,因此在访问共享的内存时必须进行同步。sync
包提供了这种能力,不过在Go
中并不经常用到,因为还有其它的办法(见下一页)。
代码:
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}
信道
- 信道是带有类型的管道,你可以通过它用信道操作符
<-
来发送或者接收值。
ch <- v //将 v 发送至信道ch
v := <- ch //从 ch 接收值并赋予 v。
(“箭头”就是数据流的方向。)
- 和映射与切片一样,信道在使用前必须创建
ch := make(chan int)
- 默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得
Go
程可以在没有显式的锁或竞态变量的情况下进行同步。 - 以下示例对切片中的数进行求和,将任务分配给两个
Go
程。 一旦两个Go
程完成了它们的计算,它就能算出最终的结果。
package main
import "fmt"
func sum(s [] int,c chan int) {
sum := 0
for _,v := range s {
sum += v
}
c <- sum // 将和送入 c
}
func main() {
s := [] int {7,2,-3,2,4,-5}
c := make(chan int)
go sum(s[:len(s)/2],c)
go sum(s[len(s)/2:],c)
x , y := <-c,<-c // 从 c 中接收
fmt.Println(x, y, x+y)
}
思考:向同一个信道多次发送值,接受的顺序如何?信道中的值存储的数据结构如何?
- 通过后面的练习,发现信道有点类似于一个阻塞队列,先进先出,信道无值阻塞信道输出,信道缓冲区满了阻塞向信道输入值
带缓冲的信道
- 信道可以是 带缓冲的 。将缓冲长度作为第二个参数提供给 make 来初始化一个带缓冲的信道:
- 不指定缓冲区大小的信道创建
make(chan int)
,缓冲区大小为0,如果不在发送值到信道之前创建一个goroutine
去读取值那么当前的goroutine
会一直阻塞。参考链接
ch := make(chan int, 100)
-
仅当信道的缓冲区填满后,向其发送数据时才会阻塞。当缓冲区为空时,接受方会阻塞。
-
修改示例填满缓冲区,然后看看会发生什么。
-
不阻塞代码
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}
- 执行结果
1
2
- 发送数据阻塞代码
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
ch <- 3
fmt.Println(<-ch)
fmt.Println(<-ch)
}
- 执行结果
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox937270155/main.go:9 +0xa0
- 只发送不创建goroutine接受
package main
import "fmt"
func main() {
c := make(chan int)//此时buffer size默认为0,当前go程将一直阻塞(或者叫死锁)
c <- 1
fmt.Println("test")
}
- 结果
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
- 发送并且创建goroutine接受
package main
import "fmt"
func main() {
c := make(chan int)
//注意这里goroutine必须创建在给信道传值之前,否则在`c <- 1`就阻塞了,根本执行不到下面的代码
go func() {
t := <-c
fmt.Println(t)
}()
c <- 1
fmt.Println("test")
}
- 结果
1
test
range 和 close
- 发送者可通过
close
关闭一个信道来表示没有需要发送的值了。接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭:若没有值可以接收且信道已被关闭,那么在执行完之后ok
会被设置为false
。
v, ok := <-ch
- 循环
for i := range c
会不断从信道接收值,直到它被关闭。 注意: 只有发送者才能关闭信道,而接收者不能。向一个已经关闭的信道发送数据会引发程序恐慌(panic)。 还要注意: 信道与文件不同,通常情况下无需关闭它们。只有在必须告诉接收者不再有值需要发送的时候才有必要关闭,例如终止一个 range 循环。
代码:
package main
import "fmt"
func fibonacci(n int,c chan int){
x,y := 0,1
for i := 0; i < n; i++{
c <-x
x,y = y,x+y
}
close(c)
}
func main(){
c := make(chan int,10)
go fibonacci(cap(c),c)
for i := range c{
fmt.Println(i)
}
}
结果
0
1
1
2
3
5
8
13
21
34
select 语句
select
语句使一个Go
程可以等待多个通信操作。select
会阻塞到某个分支可以继续执行为止,这时就会执行该分支。当多个分支都准备好时会随机选择一个执行。- 注意到
select
的代码形式和switch
非常相似, 不过select
的case
里的操作语句只能是【IO
操作】 。此示例里面select
会一直等待等到某个case
语句完成, 也就是等到成功从ch1
或者ch2
中读到数据。 则select
语句结束。
代码:
package main
import "fmt"
func fibonacci (c, quit chan int){
x, y := 0, 1
for{
select{
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
//goroutine1
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
//goroutine2
fibonacci(c, quit)
}
- 执行结果
0
1
1
2
3
5
8
13
21
34
quit
- 分析1:这里创建的两个信道buffer size都是0,意味着如果没有新的goroutine来接收值,只发送的话当前goroutine都会被阻塞
- 分析2:
fmt.Println(<-c)
中刚开始信道没有值,因此该goroutine
(goroutine1
)会一直被阻塞 - 分析3:执行到
fibonacci(c, quit)
后,for
循环中一直给信道c
中传值,此时goroutine1
被激活,取完值后再次进入阻塞状态,如此循环 - 分析4:当
goroutine1
中10此for循环执行完毕,信道c一直处于阻塞状态,此时执行quit <- 0
此时信道quit
被激活,执行fibonacci
中的case <-quit
,最终程序退出。
默认选择
- 当
select
中的其它分支都没有准备好时,default
分支就会执行。 - 为了在尝试发送或者接收时不发生阻塞,可使用 default 分支:
select {
case i := <-c:
// 使用 i
default:
// 从 c 中接收会阻塞时执行
}
- 代码
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
- 结果:
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
.
.
BOOM!
- 分析:
time.Tick
每隔指定的时间往信道输入值,time.After
指定时间后往把当前时间发送到信道
练习:等价二叉查找树
- 不同二叉树的叶节点上可以保存相同的值序列。例如,以下两个二叉树都保存了序列
1,1,2,3,5,8,13
。 - 在大多数语言中,检查两个二叉树是否保存了相同序列的函数都相当复杂。 我们将使用 Go 的并发和信道来编写一个简单的解法。
- 本例使用了 tree 包,它定义了类型:
type Tree struct {
Left *Tree
Value int
Right *Tree
}
- 实现
Walk
函数。 - 测试
Walk
函数。 函数tree.New(k)
用于构造一个随机结构的已排序二叉查找树,它保存了值k 、 2k 、 3k ... 10k
。 创建一个新的信道ch
并且对其进行步进:
go Walk(tree.New(1), ch)
然后从信道中读取并打印 10
个值。应当是数字 1, 2, 3, ..., 10
。
3. 用 Walk
实现 Same
函数来检测 t1
和 t2
是否存储了相同的值。
4. 测试 Same
函数。
Same(tree.New(1), tree.New(1)
) 应当返回 true
,而 Same(tree.New(1)
,
tree.New(2))
应当返回 false
。
代码
package main
import (
"golang.org/x/tour/tree"
"fmt"
)
/**
二叉树小知识
先序遍历:遍历顺序规则为【根左右】
中序遍历:遍历顺序规则为【左根右】
后序遍历:遍历顺序规则为【左右根】
*/
// Walk 步进 tree t 将所有的值从 tree 发送到 channel ch。
// 此处使用中序遍历
func Walk(t *tree.Tree, ch chan int) {
if t.Left != nil{
Walk(t.Left, ch)
}
ch <- t.Value
if t.Right != nil{
Walk(t.Right, ch)
}
}
// Same 检测树 t1 和 t2 是否含有相同的值。
func Same(t1, t2 *tree.Tree) bool {
ch1 := make(chan int)
ch2 := make(chan int)
go Walk(t1,ch1)
go Walk(t2,ch2)
for i := 0; i < 10; i++ {
if <-ch1 != <-ch2{
return false
}
}
return true
}
func main() {
ch := make(chan int)
go Walk(tree.New(1), ch)
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
fmt.Println(Same(tree.New(1),tree.New(1)))
fmt.Println(Same(tree.New(1),tree.New(2)))
}
- 可优化的地方,对于
Walk
函数,上面的实现没有close channle
因此不能使用range
遍历channel
中的元素改进的方法如下
func Walk(t *tree.Tree, ch chan int) {
ReWalk(t,ch)
close(ch)
}
func ReWalk(t *tree.Tree, ch chan int) {
if t.Left != nil{
Walk(t.Left, ch)
}
ch <- t.Value
if t.Right != nil{
Walk(t.Right, ch)
}
}
func main() {
ch := make(chan int)
go Walk(tree.New(1), ch)
for i := range ch{
fmt.Println(i)
}
fmt.Println(Same(tree.New(1),tree.New(1)))
fmt.Println(Same(tree.New(1),tree.New(2)))
}
sync.Mutex
- 我们已经看到信道非常适合在各个
Go
程间进行通信。 - 但是如果我们并不需要通信呢?比如说,若我们只是想保证每次只有一个
Go
程能够访问一个共享的变量,从而避免冲突? - 这里涉及的概念叫做 互斥(
mutual_exclusion
)_ ,我们通常使用 互斥锁(Mutex
)_ 这一数据结构来提供这种机制。 Go
标准库中提供了sync.Mutex
互斥锁类型及其两个方法:
Lock
Unlock
- 我们可以通过在代码前调用 Lock 方法,在代码后调用 Unlock 方法来保证一段代码的互斥执行。 参见
Inc
方法。 - 我们也可以用
defer
语句来保证互斥锁一定会被解锁。参见Value
方法。
代码:
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter 的并发使用是安全的。
type SafeCounter struct {
v map[string] int
mux sync.Mutex
}
// Inc 增加给定 key 的计数器的值。
func (c * SafeCounter)Inc(key string){
c.mux.Lock()
c.v[key]++
c.mux.Unlock()
}
// Value 返回给定 key 的计数器的当前值。
func (c *SafeCounter)Value(key string)int {
c.mux.Lock()
defer c.mux.Unlock()
return c.v[key]
}
func main() {
c := SafeCounter{v:make(map[string] int)}
for i := 0;i < 100;i++{
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}
sync.WaitGroup
我们往往会遇到这样的场景,有多个任务(goroutine)在执行,需要等待最后一个任务执行结束,程序才能推出,针对这种场景我们可以使用 sync.WaitGroup
- 不等待直接退出(有问题程序)
func say(s string) {
time.Sleep(1000 * time.Millisecond)
fmt.Println(s)
}
func TestScanTheHost(t *testing.T) {
go say("world")
fmt.Println("test")
}
say
方法执行时间很长需要一秒左右,主 go
程中没有等待 say
就直接执行退出了,导致 go say("world")
得不到执行
- 等待执行
func say(s string,wg sync.WaitGroup) {
defer wg.Done()
time.Sleep(1000 * time.Millisecond)
fmt.Println(s)
}
func TestScanTheHost(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go say("world",wg)
fmt.Println("test")
wg.Wait()
}
一开始这么写的,发现一直在等待,调查后发现这里有个坑,原来是 golang
里如果方法传递的不是地址,那么就会做一个拷贝,所以这里调用的 wg
根本就不是一个对象。简单更改如下:
//注意这里是 wg 的引用
func say(s string,wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(1000 * time.Millisecond)
fmt.Println(s)
}
func TestScanTheHost(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go say("world",&wg)
fmt.Println("test")
wg.Wait()
}
输出结果正常
test
world
本文由 zealzhangz 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2019/07/13 16:11