Go 并发模式
介绍
Go语言以其简洁的并发模型而闻名,主要通过goroutines和channels来实现并发。并发模式是解决并发编程中常见问题的设计模式,它们帮助开发者更高效地管理和协调多个并发任务。本文将介绍几种常见的Go并发模式,并通过代码示例和实际案例帮助你理解它们的应用场景。
Go routines和Channels
在Go中,goroutines是轻量级的线程,由Go运行时管理。你可以通过关键字go
启动一个goroutine。Channels则是用于在goroutines之间传递数据的管道。
package main
import (
"fmt"
"time"
)
func printNumbers() {
for i := 1; i <= 5; i++ {
fmt.Println(i)
time.Sleep(500 * time.Millisecond)
}
}
func main() {
go printNumbers() // 启动一个goroutine
time.Sleep(2 * time.Second) // 等待goroutine完成
}
输出:
1
2
3
4
5
在这个例子中,printNumbers
函数在一个新的goroutine中运行,而主goroutine则继续执行。通过time.Sleep
,我们确保主goroutine等待足够长的时间,以便printNumbers
能够完成。
常见的Go并发模式
1. Worker Pool模式
Worker Pool模式用于限制同时运行的goroutines数量,以避免资源耗尽。它通过一个任务队列和一组工作goroutines来实现。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second) // 模拟工作
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动3个worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for a := 1; a <= numJobs; a++ {
<-results
}
}
输出:
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
在这个例子中,我们创建了3个worker goroutines来处理5个任务。每个worker从jobs
channel中接收任务,并将结果发送到results
channel。
2. Fan-out/Fan-in模式
Fan-out/Fan-in模式用于将任务分发给多个goroutines(Fan-out),然后将结果合并到一个channel中(Fan-in)。
package main
import (
"fmt"
"sync"
"time"
)
func producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(chs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 启动一个goroutine来收集所有channel的结果
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(chs))
for _, c := range chs {
go output(c)
}
// 等待所有goroutines完成
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := producer(1, 2, 3, 4, 5)
// Fan-out: 启动多个goroutines来处理输入
c1 := square(in)
c2 := square(in)
// Fan-in: 合并结果
for n := range merge(c1, c2) {
fmt.Println(n)
}
}
输出:
1
4
9
16
25
在这个例子中,producer
函数生成一系列数字,square
函数计算每个数字的平方。我们启动了两个square
goroutines来处理输入(Fan-out),然后使用merge
函数将结果合并到一个channel中(Fan-in)。
3. Pipeline模式
Pipeline模式用于将多个处理步骤串联起来,每个步骤由一个goroutine处理,并通过channel传递数据。
package main
import (
"fmt"
"time"
)
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 生成数字
gen := generate(1, 2, 3, 4, 5)
// 计算平方
sq := square(gen)
// 打印结果
for n := range sq {
fmt.Println(n)
}
}
输出:
1
4
9
16
25
在这个例子中,generate
函数生成一系列数字,square
函数计算每个数字的平方。通过将这两个步骤串联起来,我们创建了一个简单的pipeline。
实际应用场景
1. Web服务器请求处理
在Web服务器中,每个请求可以作为一个goroutine来处理。通过使用Worker Pool模式,可以限制同时处理的请求数量,避免服务器过载。
2. 数据处理流水线
在数据处理任务中,Pipeline模式可以将数据处理的各个步骤(如数据清洗、转换、分析)串联起来,每个步骤由一个goroutine处理,从而提高处理效率。
3. 并发任务调度
在任务调度系统中,Fan-out/Fan-in模式可以将任务分发给多个worker goroutines,并将结果合并,从而实现高效的并发任务调度。
总结
Go语言提供了强大的并发编程工具,包括goroutines和channels。通过掌握常见的并发模式,如Worker Pool、Fan-out/Fan-in和Pipeline,你可以编写高效且易于维护的并发程序。希望本文的内容能帮助你更好地理解Go并发模式,并在实际项目中应用它们。
附加资源
练习
- 修改Worker Pool模式的代码,使其能够处理更多的任务,并观察worker的行为。
- 尝试实现一个更复杂的Pipeline,包含多个处理步骤(如过滤、映射、归约)。
- 使用Fan-out/Fan-in模式实现一个并发爬虫,将网页内容分发给多个goroutines处理,并将结果合并。
Happy coding! 🚀