Goroutine&Channel

Goroutine协程

引入

某天产品经理闲着没事干,提出了以这样一个需求:

统计1~10000之间哪些是素数

当然,for循环秒了,但是你要循环多少次捏?

为了优化这个问题,引入了并发或者并行的方式,将任务分给多个goroutine去完成,这个时候就会使用到goroutine。

基本介绍

进程和线程

  1. 进程就是程序在操作系统中的一次执行过程,是系统进行戏院分配和调度的基本单位,
  2. 线程是进程执行的一个实例,是程序执行的最小单元,是比i进程更小的能独立运行的基本单位。
  3. 一个进程可以创建销毁多个线程,同一个进程的多个线程可以并发实行。
  4. 一个程序至少有一个进程,一个进程至少有一个线程

并发和并行

多线程在单核上运行叫并发,多线程程序在多核上运行叫并行

(这个好理解不多解释了喵,并非懒得打字

Go协程和主线程

一个GO线程上可以起多个协程,协程也可以理解成轻量级的线程

Go协程的特点

  1. 有独立的栈空间
  2. 共享程序堆空间
  3. 调度用用户控制
  4. 协程是轻量级的线程
  5. 足够轻量,在主协程里可以轻松起上万个协程,而且很稳定

快速入门

我们话不多说直接上案例:

请编写一个程序,实现如下功能:

  1. 在主线程开启一个协程,该协程间隔一秒输出一个hello world
  2. 主线程也每隔一秒输出一个hello StarRailway
  3. 要求主线程和协程同时执行
 package main
 ​
 import (
     "fmt"
     "strconv"
     "time"
 )
 /*先写一个每隔一秒输出一句话的函数*/
 func test() {
     for i := 0; i <= 10; i++ {
         fmt.Println("hello world" + strconv.Itoa(i))
         time.Sleep(time.Second)
     }
 }
 func main() {
     go test() /*开启了一个协程*/
     for i := 0; i <= 10; i++ {
         fmt.Println("hello Star Railway" + strconv.Itoa(i))
         time.Sleep(time.Second)
     }
 }
 ​
image-20260127194405908

????go test就行了?JaVALORANT玩家轻轻的哭了。

注意:如果主线程退出了,协程没执行完也会退出

小结:

  1. 主线程是一个物理线程,是直接作用在CPU上,是重量级的,非常消耗CPU资源
  2. 协程是从主线程开启的,是轻量级的,是逻辑态,对资源消耗相对较小
  3. Golang的协程机制是Golang的重要特点,其他编程语言的并发机制一般是基于线程的,开启过多的线程,资源消耗巨大,这里就凸显了Golang在并发上的巨大优势。

MPG模式

M:操作系统的主线程

P:协程执行需要的上下文

G:协程

我这里附上两篇文章大家自行阅读

MPG 模式的介绍 | 学习笔记-阿里云开发者社区

6.3 MPG 模型与并发调度单元 | Go 语言原本

以及视频

运行状态

状态1:

当前程序有一个M,这个M在执行一个G,且有三个G在协程队列中等候

状态2

M0主线程正在执行G0协程,此时G0协程阻塞了,比如说读文件或者数据库,这时候会创建另一个主线程M1(或者从线程池中取),并将等待的三个协程挂到M1下面执行,M0仍然执行文件的读写,

设置CPU数目

介绍:为了充分利用多CPU的优势,在golang中可以设置cpu的数目,go1.8之后默认让程序运行在多个核上,1.8之前还是要设置一下,更高效利用CPU

引用包

runtime

常用函数

NumCPUGOMAXPROCS

案例

 package main
 ​
 import (
     "fmt"
     "runtime"
 )
 ​
 func main() {
     cpuNum := runtime.NumCPU()
     fmt.Println("CPUNUM=", cpuNum)
 ​
     /*可以自己设置使用多个CPU*/
     runtime.GOMAXPROCS(cpuNum - 1)
     fmt.Println("over")
 }

Channel

引入

需求:

现在计算1-200各各数的阶乘,并且把各个数阶乘放入到map中,最后显示出来用goroutine实现

我们先来写一些,看看有什么问题

来我们先来写一个

 package main
 ​
 import "fmt"
 ​
 var (
  myMap = make(map[int]int, 10)
 )
 ​
 func test(n int) {
  res := 1
  for i := 1; i <= n; i++ {
  res *= i
  }
  myMap[n] = res
 }
 func main() {
  for i := 1; i <= 200; i++ {
  go test(i)
  }
  for i, v := range myMap {
  fmt.Printf("map[%d] = %d\n", i, v)
  }
 }
image-20260127212608966

?一堆报错?why?

问题

这里是由于多个 goroutine 同时对同一个 map 进行写操作,200个协程在写入同一块map空间。你觉得这安全吗QAQ,这里很容易出现竞争关系,我们可以通过go build -race main.go,来看看

此外,这里犹豫无法确定协程的运行时间但是,主线程只会傻傻的遍历完for就结束了,然后协程也会结束,这个时候我们到底有没有写入,我们不知道。我们这里还需要做一些其他操作。(Go高版本好像解决了这个问题,窝在跑的时候没遇到这里就提一嘴)

解决方案

加锁

通过堆map空间加锁,来产生队列,每次写入的时候加锁,操作完成后解锁,如果发现空间加锁,就按照队列进行排队。

我们就要用到这个包sync中的互斥锁

 package main
 ​
 import (
  "fmt"
  "sync"
 )
 var (
  myMap = make(map[int]int, 10)
  lock  sync.Mutex
 )
 ​
 func test(n int) {
  res := 1
  for i := 1; i <= n; i++ {
  res *= i
  }
  lock.Lock()
  myMap[n] = res
  lock.Unlock()
 }
 func main() {
  for i := 1; i <= 200; i++ {
  go test(i)
  }
  lock.Lock()
  for i, v := range myMap {
  fmt.Printf("map[%d] = %d\n", i, v)
  }
  lock.Unlock()
 }
image-20260127214839303

????叽里咕噜说啥呢我咋看不懂???,原来是阶乘超过了int的范围。。那没招了,我们调小点范围就好了

当然第二种方法就是用channel

基本介绍

对于全局变量加锁来实现通讯,不利于多个协程对全局变量的读写操作,这个时候新的通讯机制channel出现了。

channel的本质就是一个数据结构——队列

特点

  1. 数据先进先出
  2. 线程安全,多协程访问时,不需要加锁,不会发生资源竞争问题
  3. channel是有类型的,一个string的channel只能存放string类型的数据

快速入门

声明

 var intChan chan int
 var maoChan chan map[int]string

注意:

  1. channel是引用类型
  2. 必须初始化才能写入内容,需要make
  3. 有类型

初始化

 package main
 ​
 import "fmt"
 ​
 func main() {
  var intChan chan int
  intChan = make(chan int, 3)
  fmt.Println("intChan的值为:%v,其地址为:%p", intChan, &intChan)
 ​
  /*写入数据*/
  intChan <- 10
  num := 211
  intChan <- num
  fmt.Printf("channel len = %v cap = %v", len(intChan), cap(intChan))
  /*取出数据*/
  var num2 int
  num2 = <-intChan
  fmt.Println("num2 = ", num2)
  fmt.Printf("channel len = %v cap = %v", len(intChan), cap(intChan))
 }
 ​
image-20260127222048321

注意:

  1. channel长度是固定的,不是可变的,超过长度继续放入会报错
  2. 取出后可以继续放入
  3. 当channel为空的时候,继续去取出会报错,会报deadlock

遍历和关闭

使用内置函数close去关闭channel,当channel关闭后就不能继续写入数据,但是可以读取数据

支持使用for-range进行遍历,但是要注意,如果chanel没有关闭就会返回dead lock如果已经关闭才会返回争产大哥遍历数据,遍历结束后,退出遍历。

这些比较简单而且好理解,就不做代码演示了

goroutine&channel结合

需求:

  1. 开启一个writeData协程,向管道intChan中写入50个整数。
  2. 开启一个readData协程,从管道intChan中读取writeData写入的数据。
  3. 注意:writeDatareadData操作的是同一个管道。
  4. 主线程需要等待writeDatareadData协程都完成工作才能退出

实现

 package main
 ​
 import "fmt"
 ​
 func writeData(intChan chan int) {
  for i := 0; i <= 50; i++ {
  intChan <- i
  }
  close(intChan)
 }
 func readData(intChan chan int, exitChan chan bool) {
  for {
  v, ok := <-intChan
  if !ok {
  break
  }
  fmt.Printf("readData:%v\n", v)
  }
  exitChan <- true
 }
 func main() {
  intChain := make(chan int, 50)
  exitChain := make(chan bool, 1)
  go writeData(intChain)
  go readData(intChain, exitChain)
  fmt.Println("主线程等待ing")
  <-exitChain
  fmt.Println("所有协程完成,主线程退出")
 }

注意:

在 Go 中,主线程(main 函数)一旦执行完毕,整个程序就会立即退出,不管还有没有其他 goroutine 在运行!

所以,如果你写了两个协程(比如 writeDatareadData),但主线程不等它们,直接结束,那么:

  • 协程可能还没开始运行
  • 或者只运行了一部分
  • 程序就“突然”退出了 → 数据丢失、逻辑不完整

exitChan 是最简单的信号量,用bool值来传递完成状态,确保所有数据被正常处理。

管道阻塞

阻塞的情况:

情况是否阻塞
向无缓冲 channel 写数据,但没有接收者阻塞
从无缓冲 channel 读数据,但没有发送者阻塞
向有缓冲 channel 写数据,缓冲区满阻塞
从有缓冲 channel 读数据,缓冲区空阻塞

解决方案

1.使用带缓冲的channel

 ch := make(chan int, 10) // 缓冲区大小为10

2.使用select处理超时和默认操作

 select {
 case data := <-ch:
     fmt.Println(data)
 case <-time.After(1 * time.Second):
     fmt.Println("超时")
 }

3.使用 close() 正确关闭通道,防止死锁。

综合案例

找出 1 到 N 范围内的所有素数(质数),使用多个协程并发处理。

思路分析:

方法一:暴力法(单线程)
  • 对每个数判断是否为素数。
  • 时间复杂度高:O(n√n)
方法二:并发优化(协程 + 分段)
  1. 将数字范围分段,例如:[1,100], [101,200], …
  2. 每个协程负责一段区间,判断该区间内哪些是素数。
  3. 每个协程将结果写入一个共享 channel。
  4. 主线程收集所有结果。

注意事项:

  • 协程数量不宜过多(建议不超过 CPU 核心数)。
  • 需要控制 goroutine 数量,避免资源耗尽
 package main
 ​
 import (
     "fmt"
     "math"
     "runtime"
     "sync"
 )
 ​
 func isPrime(n int) bool {
     if n < 2 {
         return false
     }
     for i := 2; i <= int(math.Sqrt(float64(n))); i++ {
         if n%i == 0 {
             return false
         }
     }
     return true
 }
 ​
 func findPrimes(start, end int, resultChan chan int, wg *sync.WaitGroup) {
     defer wg.Done()
     for i := start; i <= end; i++ {
         if isPrime(i) {
             resultChan <- i
         }
     }
 }
 ​
 func main() {
     const max = 1000
     numWorkers := runtime.NumCPU() /* 使用 CPU 核心数作为协程数*/
     chunkSize := (max + numWorkers - 1) / numWorkers /*每个协程处理的区间大小*/
     resultChan := make(chan int, max) /* 缓冲通道,防止阻塞*/
     var wg sync.WaitGroup
 ​
     for i := 0; i < numWorkers; i++ {
         start := i*chunkSize + 1
         end := (i+1)*chunkSize
         if end > max {
             end = max
         }
         wg.Add(1)
         go findPrimes(start, end, resultChan, &wg)
     }
 ​
     go func() {
         wg.Wait()
         close(resultChan)
     }()
 ​
     /*收集结果*/
     primes := []int{}
     for prime := range resultChan {
         primes = append(primes, prime)
     }
 ​
     fmt.Printf("1 到 %d 的素数共 %d 个:\n", max, len(primes))
     fmt.Println(primes)
 }

效率测试

 package main
 ​
 import (
     "fmt"
     "math"
     "runtime"
     "sync"
     "time"
 )
 ​
 /*判断是否为素数*/
 func isPrime(n int) bool {
     if n < 2 {
         return false
     }
     for i := 2; i <= int(math.Sqrt(float64(n))); i++ {
         if n%i == 0 {
             return false
         }
     }
     return true
 }
 ​
 /*串行方式求素数*/
 func testSerial(max int) time.Duration {
     start := time.Now()
     var primes []int
     for i := 2; i <= max; i++ {
         if isPrime(i) {
             primes = append(primes, i)
         }
     }
     fmt.Printf("串行方式找到 %d 个素数\n", len(primes))
     return time.Since(start)
 }
 ​
 /*并发方式求素数*/
 func testConcurrent(max int) time.Duration {
     start := time.Now()
 ​
     numWorkers := runtime.NumCPU()
     chunkSize := (max + numWorkers - 1) / numWorkers
 ​
     resultChan := make(chan int, max)
     var wg sync.WaitGroup
 ​
     /*启动多个协程处理不同区间*/
     for i := 0; i < numWorkers; i++ {
         startNum := i*chunkSize + 1
         endNum := (i+1)*chunkSize
         if startNum < 2 {
             startNum = 2 
         }
         if endNum > max {
             endNum = max
         }
         if startNum > endNum {
             continue
         }
 ​
         wg.Add(1)
         go func(start, end int) {
             defer wg.Done()
             for j := start; j <= end; j++ {
                 if isPrime(j) {
                     resultChan <- j
                 }
             }
         }(startNum, endNum)
     }
 ​
     go func() {
         wg.Wait()
         close(resultChan)
     }()
 ​
     count := 0
     for range resultChan {
         count++
     }
     fmt.Printf("并发方式找到 %d 个素数\n", count)
 ​
     return time.Since(start)
 }
 ​
 func main() {
     max := 8000 /* 测试量设为 8000*/
 ​
     fmt.Printf("正在测试...\n")
 ​
     serialTime := testSerial(max)
     concurrentTime := testConcurrent(max)
 ​
     fmt.Println("\n性能对比:")
     fmt.Printf("串行耗时:%v\n", serialTime)
     fmt.Printf("并发耗时:%v\n", concurrentTime)
 ​
 }

使用细节和注意事项

1.channel可以声明为只读,或者只写

类型语法含义
双向chan int可发送也可接收
只写chan<- int只能发送数据
只读<-chan int只能接收数据

案例:

 package main
 ​
 import "fmt"
 ​
 /* send 函数:向 channel 发送数据*/
 func send(ch chan<- int, exitChan chan struct{}) {
     for i := 0; i < 10; i++ {
         ch <- i
     }
     close(ch)
     var a struct{}
     exitChan <- a
 }
 ​
 /*recv 函数:从 channel 接收数据*/
 func recv(ch <-chan int, exitChan chan struct{}) {
     for {
         v, ok := <-ch
         if !ok {
             break
         }
         fmt.Println(v)
     }
     var a struct{}
     exitChan <- a
 }
 ​
 func main() {
     var ch chan int
     ch = make(chan int, 10)
     exitChan := make(chan struct{}, 2)
 ​
     go send(ch, exitChan)
     go recv(ch, exitChan)
 ​
     var total = 0
     for _ = range exitChan {
         total++
         if total == 2 {
             break
         }
     }
     fmt.Println("结束...")
 }
  • send 使用 只写 channel (chan<- int),确保只能发送数据。
  • recv 使用 只读 channel (<-chan int),确保只能接收数据。

通过确保只读只写,这里可以有效防止我们的误操作,注意之类只作为一种属性标识,并不是一个新的类型

2.select解决阻塞

上面讲过了喵。

3.goroutine使用recover,解决协程中出现panic,导致程序崩溃

recover 只能在 defer 函数中生效,并且只能在发生 panic 的 goroutine 中调用。

暂无评论

发送评论 编辑评论


				
上一篇
下一篇