Channel
模拟业务超时处理
func main() {
select {
case <-doWork():
fmt.Println("任务结束")
case <-time.After(1 * time.Second):
fmt.Println("任务处理超时")
}
}
func doWork() <-chan struct{} {
ch := make(chan struct{})
go func() {
// 任务处理耗时
time.Sleep(2 * time.Second)
// 表示任务执行结束
ch <- struct{}{}
}()
return ch
}
模拟 Signal
func main() {
isOver := make(chan struct{})
go func() {
collectMsg(isOver)
}()
<-isOver
calculateMsg()
}
// 采集
func collectMsg(isOver chan struct{}) {
time.Sleep(500 * time.Millisecond)
fmt.Println("完成采集工具")
isOver <- struct{}{}
}
// 计算
func calculateMsg() {
fmt.Println("开始进行数据分析")
}
Channel 之间传递消息
type token struct{}
func main() {
num := 4
var chs []chan token
// 4 个work
for i := 0; i < num; i++ {
chs = append(chs, make(chan token))
}
for j := 0; j < num; j++ {
go worker(j, chs[j], chs[(j+1)%num])
}
// 先把令牌交给第一个
chs[0] <- struct{}{}
// select {}
time.Sleep(10 * time.Second)
}
func worker(id int, ch chan token, next chan token) {
for {
// 对应work 取得令牌
token := <-ch
fmt.Println(id + 1)
time.Sleep(1 * time.Second)
// 传递给下一个
next <- token
}
}
模拟生产者消费者
func main() {
ch := make(chan int, 10)
go consumer(ch)
time.Sleep(1 * time.Second)
go producer(ch)
time.Sleep(3 * time.Second)
}
// 一个生产者
func producer(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
// 消费者
func consumer(task <-chan int) {
for i := 0; i < 5; i++ {
// 5个消费者
go func(id int) {
for {
// <-task 阻塞读,如果为 false 说明通道已关闭
item, ok := <-task
if !ok {
return
}
fmt.Printf("消费者:%d,消费了:%d\n", id, item)
// 给别人一点机会不会吃亏
time.Sleep(50 * time.Millisecond)
}
}(i)
}
}
模拟互斥锁
type ticket struct{}
type Mutex struct {
ch chan ticket
}
// 创建一个缓冲区为1的通道作
func newMutex() *Mutex {
return &Mutex{ch: make(chan ticket, 1)}
}
// 谁能往缓冲区为1的通道放入数据,谁就获取了锁
func (m *Mutex) Lock() {
m.ch <- struct{}{}
}
// 解锁就把数据取出
func (m *Mutex) unLock() {
select {
case <-m.ch:
default:
panic("已经解锁了")
}
}
func main1() {
mutex := newMutex()
go func() {
// 如果是1先拿到锁,那么2就要等1秒才能拿到锁
mutex.Lock()
fmt.Println("任务1拿到锁了")
time.Sleep(1 * time.Second)
mutex.unLock()
}()
go func() {
mutex.Lock()
// 如果是2拿先到锁,那么1就要等2秒才能拿到锁
fmt.Println("任务2拿到锁了")
time.Sleep(2 * time.Second)
mutex.unLock()
}()
time.Sleep(500 * time.Millisecond)
// 用了一点小手段这里最后才能拿到锁
mutex.Lock()
mutex.unLock()
close(mutex.ch)
}
模拟完备的不可重入锁
type Mutex struct {
ch chan struct{}
}
// 初始化
func NewMutex() *Mutex {
mu := &Mutex{ch: make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 请求锁
func (m *Mutex) Lock() {
<-m.ch
}
// 解锁
func (m *Mutex) UnLock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
// 加入一个超时的设置
func (m *Mutex) LockTimeOut(timeOut time.Duration) bool {
timer := time.NewTicker(timeOut)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
}
return false
}
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
fmt.Printf("locked v %v\n", ok)
ok = m.TryLock()
fmt.Printf("locked %v\n", ok)
}
Context
通知取消
使用一个 context 实现通知取消功能,被通知到的协程需要读取 context的状态以判断是否徐亚继续执行
func operation1(ctx context.Context) error {
// 假设这个操作会因为某种原因失败
// 使用time.Sleep来模拟一个资源密集型操作
time.Sleep(100 * time.Millisecond)
return errors.New("failed")
}
func operation2(ctx context.Context) {
for {
select {
case <-ctx.Done():
println("canceled operation2")
return
// 注意:省略default分支 整个goroutine 会被阻塞住
default:
}
println("operation2 exec ing")
// 可以选择在default分支或者是这里执行业务逻辑
time.Sleep(10 * time.Millisecond)
}
}
func main() {
// 新建一个上下文
ctx := context.Background()
// 在初始上下文的基础上创建一个有取消功能的上下文
ctx, cancel := context.WithCancel(ctx)
// 在不同的goroutine中运行operation2
go func() {
operation2(ctx)
}()
err := operation1(ctx)
// 如果这个操作返回错误,取消所有使用相同上下文的 goroutine 的执行
if err != nil {
cancel()
}
}
超时通知取消
Context 上下文 · Issue #50 · kevinyan815/gocookbook (github.com)
// 这个上下文将会在3秒后被取消
// 如果需要在到期前就取消可以像前面的例子那样使用cancel函数
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
// 上下文将在2009-11-10 23:00:00被取消
ctx, cancel := context.WithDeadline(ctx, time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC))
worker goroutine 内部要跟上面的例子一样,通过 select case <- Done() 接收通道传递过来的取消信号。唯一的区别是这两种Context是到期自动同步信号,不需要在 main goroutine 内主动触发 。
如果想在到期前提前让 worker goroutine 结束执行,调用创建Context时返回的 cancel 函数。
使用WaitGroup, Channel和Context打造一个并发用户标签查询器
使用WaitGroup, Channel和Context打造一个并发用户标签查询器 · Issue #21 · kevinyan815/gocookbook (github.com)
简单多任务分解
一个把Context,Timer,Goroutine和Channel结合起来的例子 · Issue #18 · kevinyan815/gocookbook (github.com)
用多个goroutine在一个超级长的切片中查找目标整数,限时5秒。在找到目标值或者超时后立刻结束所有goroutine的执行。
func main() {
timer := time.NewTimer(time.Second * 5)
data := []int{1, 2, 3, 10, 999, 8, 345, 7, 98, 33, 66, 77, 88, 68, 96}
dataLen := len(data)
size := 3
target := 345
ctx, cancel := context.WithCancel(context.Background())
resultChan := make(chan bool)
for i := 0; i < dataLen; i += size {
// 划定查询范围
end := i + size
if end >= dataLen {
end = dataLen - 1
}
go SearchTarget(ctx, data[i:end], target, resultChan)
}
select {
case <-timer.C:
fmt.Fprintln(os.Stderr, "TimeOut! Not found")
cancel()
case <-resultChan:
fmt.Fprintln(os.Stdout, "Found It")
cancel()
}
time.Sleep(20)
}
func SearchTarget(ctx context.Context, data []int, target int, resultChan chan bool) {
for _, v := range data {
select {
case <-ctx.Done():
fmt.Fprintf(os.Stdout, "Task Cancelded~~\n")
return
default:
}
fmt.Fprintf(os.Stdout, "v: %d\n", v)
time.Sleep(time.Millisecond * 1500)
if target == v {
resultChan <- true
return
}
}
}
如何实现超时之后取消任务?
1、在主协程中,监控 timer 是否超时,如果超时,执行 context 提供的 cancel 函数
select {
case <-timer.C:
fmt.Fprintln(os.Stderr, "TimeOut! Not found")
cancel()
case <-resultChan:
fmt.Fprintln(os.Stdout, "Found It")
cancel()
}
2、在任务协程中:每次执行下一次查找时查看 context 存在
select {
case <-ctx.Done():
fmt.Fprintf(os.Stdout, "Task Cancelded~~\n")
return
default:
}
任务协程是如何向主协程报告找到 target 的?
使用一个 channel