Go 协程池(goroutine pool)
package main
import (
"fmt"
"sync"
)
// 任务结构体
type Task struct {
ID int // 任务ID
}
// 协程池结构体
type Pool struct {
WorkerNum int // 协程池中的协程数量
TaskQueue chan *Task // 任务队列
WG sync.WaitGroup // 用于等待所有协程完成任务
StopChannel chan struct{} // 停止信号通道
}
// 创建新的协程池
func NewPool(workerNum, queueSize int) *Pool {
return &Pool{
WorkerNum: workerNum,
TaskQueue: make(chan *Task, queueSize),
StopChannel: make(chan struct{}),
}
}
// 开始执行任务
func (p *Pool) Start() {
// 创建指定数量的协程
for i := 0; i < p.WorkerNum; i++ {
go p.worker()
}
}
// 添加任务到任务队列
func (p *Pool) AddTask(task *Task) {
p.TaskQueue <- task
}
// 关闭协程池
func (p *Pool) Shutdown() {
close(p.StopChannel)
p.WG.Wait()
}
// 协程执行的具体任务
func (p *Pool) worker() {
// 增加等待组计数
p.WG.Add(1)
// 循环监听任务队列和停止信号通道
for {
select {
case task := <-p.TaskQueue:
// 执行任务
fmt.Printf("Worker %d is processing task %d\n", p.WorkerNum, task.ID)
case <-p.StopChannel:
// 收到停止信号,结束协程
p.WG.Done()
return
}
}
}
func main() {
pool := NewPool(3, 10)
pool.Start()
// 添加一些任务到协程池
for i := 0; i < 5; i++ {
task := &Task{ID: i}
pool.AddTask(task)
}
// 关闭协程池
pool.Shutdown()
}Last updated