前言

之前从未接触过并发编程的童鞋可放心食用!

并发与并行

  • 并发 (Concurrency):
    并发是指同时处理多个任务的能力,但这些任务不一定是同时执行的。任务可能通过切换来共享时间片。
  • 并行 (Parallelism):
    并行是指多个任务同时执行,需要多个处理器核心支持。在 Go 中,Goroutine 是通过调度器来实现并发,最终通过 CPU 核心实现并行。

并发和并行的关系:
并发是逻辑上的任务切换,而并行是物理上的任务同时运行。

Go的并发核心:Goroutine

Goroutine是什么

Goroutine 是 Go 的协程实现,是一种比传统线程更轻量级的并发任务单元。

  • 轻量:一个 Goroutine 启动时仅占用大约 2 KB 的内存(相比线程,通常占用 1 MB 堆栈)。
  • 调度灵活:由 Go 的运行时(runtime)管理调度,而不是直接依赖操作系统内核线程。
  • 高效:Go 的运行时会动态调整 Goroutine 的栈大小(最小 2 KB,最大可达 1 GB),并通过用户态线程池实现调度。

协程 vs 线程:

  • 线程:由操作系统调度,重量级,开销较大(如栈内存固定且较大、上下文切换代价高)。
  • 协程:由语言运行时调度,轻量级,开销小(动态栈、用户态上下文切换)。

创建和使用 Goroutine

创建一个 Goroutine 的方法非常简单,使用 go 关键字即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
"time"
)

func task() {
fmt.Println("Task is running")
}

func main() {
go task() // 启动一个 Goroutine
fmt.Println("Main function")
time.Sleep(1 * time.Second) // 确保主 Goroutine 不退出
}

运行结果(顺序不可预测,Task is running可能都不会输出):

1
2
Main function
Task is running
  • go task():启动一个新的 Goroutine,运行 task 函数。
  • 主 Goroutine 继续执行 fmt.Println(“Main function”)。
  • 如果没有 time.Sleep,主 Goroutine 会立即退出,程序结束,task 可能尚未完成。

Goroutine的生命周期

(1) 启动 Goroutine

  • Goroutine 是通过 go 关键字启动的。
  • 每个 Goroutine 都会独立执行其任务,但与其他 Goroutine 和主 Goroutine 共享相同的地址空间。

(2) 运行中

  • Goroutine 的调度由 Go 运行时管理,运行时会将多个 Goroutine 映射到少量的系统线程(M:N 模型)。
  • Goroutine 的执行顺序不确定,由运行时的调度算法决定。

(3) 结束

  • Goroutine 执行完成或返回时会自动退出,无需显式销毁。

Goroutine 之间的同步与通信工具

sync包

sync 提供了一组基础的同步原语,用于解决 Goroutine 之间的同步问题,确保共享数据在多 Goroutine 环境下安全访问。

常见的 sync 工具

(1) sync.WaitGroup
作用:等待一组 Goroutine 完成。
WaitGroup 提供了三个方法:

  • Add(delta int):设置需要等待的 Goroutine 数量(+delta)。
  • Done():每当一个 Goroutine 完成任务,调用一次,表示减少一个需要等待的任务(-1)。
  • Wait():阻塞主 Goroutine,直到所有计数归零。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    package main

    import (
    "fmt"
    "sync"
    )

    func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 减少计数
    //defer保证这句话总是在worker函数结束的时候执行,控制进程减1
    fmt.Printf("Worker %d is starting\n", id)
    // 模拟工作任务
    fmt.Printf("Worker %d is done\n", id)
    }

    func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
    wg.Add(1) // 增加计数
    go worker(i, &wg)
    }
    wg.Wait() // 阻塞,直到所有任务完成
    fmt.Println("All workers are done")
    }
    注:Goroutines 是按照 for 循环的顺序启动的(1, 2, 3)。这意味着 Goroutines 被创建时的顺序是确定的。实际执行顺序取决于 Go 运行时调度器,调度器会决定哪个 Goroutine 先运行。

channel

Channel 是 Go 语言中用于 在 Goroutines 之间传递数据的核心机制之一。它是 Go 中并发编程的一个重要特性,可以让不同的 Goroutines 安全地通信和同步。Channel 是一种类型化的管道,可以用于在多个 Goroutines 之间传递消息,且无需使用锁(mutex)来保证同步。

基本概念

声明与初始化
使用 make 函数创建一个 channel,并可以指定容量(缓冲区大小)。

1
2
3
4
5
//Type是通道的数据类型
var ch chan Type // 声明一个空的 channel
ch = make(chan Type) // 创建一个 channel
ch := make(chan Type) // 声明并初始化一个 channel
//Type 是通过 channel 传递的数据类型,表示你希望在 channel 中传递的数据类型。

发送数据&接收数据
1
2
ch <- data  // 将数据发送到 channel
data := <-ch // 从 channel 接收数据并赋值给变量

关闭 Channel
1
2
3
close(ch)  // 关闭 channel
//使用 close(ch) 来关闭一个 channel,表示没有更多的数据会发送到该 channel。
//close 主要用于通知接收方,数据传输已经结束,接收方可以在接收数据时检查 channel 是否已关闭。

通道方向

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//通道:传送带
//只读:只能从传送带拿走东西
//只写:只能把东西放到传送带上

package main

func onre(ch <-chan int) int { //只读,读出来
Gol := <-ch
if Gol == 1 {
return 1
}
return 0
}

func onwr(ch chan<- int) int { //只写,写进去
ch <- 30
return 0
}

func main() {
//
}

优雅的超时处理

凭借通道和select可以轻松完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//优雅的超时处理

package main

import (
"fmt"
"time"
)

func main() {
c1 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
select {
case msg := <-c1:
fmt.Println(msg)
case <-time.After(2 * time.Second):
fmt.Println("timeout 1")
}
}

缓冲区与非缓冲区

  • 非缓冲 channel:
    发送方和接收方必须同时准备好,才能完成数据传递。发送方在发送数据时会阻塞,直到有接收方准备好接收数据。
  • 缓冲 channel:
    可以预设一个缓冲区大小,在发送数据时,若缓冲区未满,发送方不会阻塞。只有当缓冲区满时,发送方才会被阻塞。接收方也不会立即阻塞,直到缓冲区为空时

非缓冲channel示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import "fmt"

func main() {
ch := make(chan string) // 创建一个非缓冲 channel

// 启动一个 Goroutine 发送数据
go func() {
ch <- "Hello, Go!" // 发送数据到 channel
}()

// 接收数据并输出
message := <-ch // 从 channel 接收数据
fmt.Println(message) // 输出接收到的消息
}

  • 主 Goroutine 会阻塞在 message := <-ch 直到接收到来自其他 Goroutine 的数据。
  • ch <- “Hello, Go!” 会将数据发送到 channel,并且发送操作会阻塞,直到主 Goroutine 执行完 <-ch 来接收数据。

缓冲channel示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import "fmt"

func main() {
ch := make(chan string, 2) // 创建一个缓冲区大小为 2 的 channel

// 启动一个 Goroutine 发送数据
go func() {
ch <- "Message 1"
ch <- "Message 2"
}()

// 接收并输出数据
fmt.Println(<-ch)
fmt.Println(<-ch)
}

  • 创建了一个缓冲区大小为 2 的 channel,因此可以在没有阻塞的情况下向 channel 发送 2 条消息。
  • 当我们从 channel 中接收数据时,如果缓冲区不为空,就能顺利获取到数据。
  • 若发送3条消息,则会阻塞,直到缓冲区被释放

Channel 的同步与 Goroutines 协作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"time"
)

func worker(id int, ch chan bool) {
fmt.Printf("Worker %d is starting\n", id)
time.Sleep(time.Second) // 模拟任务执行
fmt.Printf("Worker %d is done\n", id)
ch <- true // 通知任务完成
}

func main() {
ch := make(chan bool, 3) // 创建一个缓冲区大小为 3 的 channel

for i := 1; i <= 3; i++ {
go worker(i, ch) // 启动多个 Goroutine
}

// 等待所有 Goroutine 完成
for i := 1; i <= 3; i++ {
<-ch // 从 channel 接收任务完成的信号
}

fmt.Println("All workers are done")
}
  • 这里使用了一个 缓冲 channel 来通知主 Goroutine 每个子 Goroutine 的任务完成。每个工作 Goroutine 在完成任务后通过 ch <- true 向 channel 发送信号。
  • 主 Goroutine 在 for 循环中通过 <-ch 等待所有任务的完成,直到接收到所有的完成信号。