跳到主要内容

Go 并发模式

介绍

Go语言以其简洁的并发模型而闻名,主要通过goroutines和channels来实现并发。并发模式是解决并发编程中常见问题的设计模式,它们帮助开发者更高效地管理和协调多个并发任务。本文将介绍几种常见的Go并发模式,并通过代码示例和实际案例帮助你理解它们的应用场景。

Go routines和Channels

在Go中,goroutines是轻量级的线程,由Go运行时管理。你可以通过关键字go启动一个goroutine。Channels则是用于在goroutines之间传递数据的管道。

go
package main

import (
"fmt"
"time"
)

func printNumbers() {
for i := 1; i <= 5; i++ {
fmt.Println(i)
time.Sleep(500 * time.Millisecond)
}
}

func main() {
go printNumbers() // 启动一个goroutine
time.Sleep(2 * time.Second) // 等待goroutine完成
}

输出:

1
2
3
4
5

在这个例子中,printNumbers函数在一个新的goroutine中运行,而主goroutine则继续执行。通过time.Sleep,我们确保主goroutine等待足够长的时间,以便printNumbers能够完成。

常见的Go并发模式

1. Worker Pool模式

Worker Pool模式用于限制同时运行的goroutines数量,以避免资源耗尽。它通过一个任务队列和一组工作goroutines来实现。

go
package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second) // 模拟工作
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}

func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)

// 启动3个worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}

// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)

// 收集结果
for a := 1; a <= numJobs; a++ {
<-results
}
}

输出:

worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

在这个例子中,我们创建了3个worker goroutines来处理5个任务。每个worker从jobs channel中接收任务,并将结果发送到results channel。

2. Fan-out/Fan-in模式

Fan-out/Fan-in模式用于将任务分发给多个goroutines(Fan-out),然后将结果合并到一个channel中(Fan-in)。

go
package main

import (
"fmt"
"sync"
"time"
)

func producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

func merge(chs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// 启动一个goroutine来收集所有channel的结果
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}

wg.Add(len(chs))
for _, c := range chs {
go output(c)
}

// 等待所有goroutines完成
go func() {
wg.Wait()
close(out)
}()

return out
}

func main() {
in := producer(1, 2, 3, 4, 5)

// Fan-out: 启动多个goroutines来处理输入
c1 := square(in)
c2 := square(in)

// Fan-in: 合并结果
for n := range merge(c1, c2) {
fmt.Println(n)
}
}

输出:

1
4
9
16
25

在这个例子中,producer函数生成一系列数字,square函数计算每个数字的平方。我们启动了两个square goroutines来处理输入(Fan-out),然后使用merge函数将结果合并到一个channel中(Fan-in)。

3. Pipeline模式

Pipeline模式用于将多个处理步骤串联起来,每个步骤由一个goroutine处理,并通过channel传递数据。

go
package main

import (
"fmt"
"time"
)

func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

func main() {
// 生成数字
gen := generate(1, 2, 3, 4, 5)

// 计算平方
sq := square(gen)

// 打印结果
for n := range sq {
fmt.Println(n)
}
}

输出:

1
4
9
16
25

在这个例子中,generate函数生成一系列数字,square函数计算每个数字的平方。通过将这两个步骤串联起来,我们创建了一个简单的pipeline。

实际应用场景

1. Web服务器请求处理

在Web服务器中,每个请求可以作为一个goroutine来处理。通过使用Worker Pool模式,可以限制同时处理的请求数量,避免服务器过载。

2. 数据处理流水线

在数据处理任务中,Pipeline模式可以将数据处理的各个步骤(如数据清洗、转换、分析)串联起来,每个步骤由一个goroutine处理,从而提高处理效率。

3. 并发任务调度

在任务调度系统中,Fan-out/Fan-in模式可以将任务分发给多个worker goroutines,并将结果合并,从而实现高效的并发任务调度。

总结

Go语言提供了强大的并发编程工具,包括goroutines和channels。通过掌握常见的并发模式,如Worker Pool、Fan-out/Fan-in和Pipeline,你可以编写高效且易于维护的并发程序。希望本文的内容能帮助你更好地理解Go并发模式,并在实际项目中应用它们。

附加资源

练习

  1. 修改Worker Pool模式的代码,使其能够处理更多的任务,并观察worker的行为。
  2. 尝试实现一个更复杂的Pipeline,包含多个处理步骤(如过滤、映射、归约)。
  3. 使用Fan-out/Fan-in模式实现一个并发爬虫,将网页内容分发给多个goroutines处理,并将结果合并。

Happy coding! 🚀