Golang 并发示例代码

Channel

模拟业务超时处理

func main() {
	select {
	case <-doWork():
		fmt.Println("任务结束")
	case <-time.After(1 * time.Second):
		fmt.Println("任务处理超时")
	}
}

func doWork() <-chan struct{} {
	ch := make(chan struct{})
	go func() {
		// 任务处理耗时
		time.Sleep(2 * time.Second)
		// 表示任务执行结束
		ch <- struct{}{}
	}()
	return ch
}

模拟 Signal

func main() {
	isOver := make(chan struct{})
	go func() {
		collectMsg(isOver)
	}()
	<-isOver
	calculateMsg()
}

// 采集
func collectMsg(isOver chan struct{}) {
	time.Sleep(500 * time.Millisecond)
	fmt.Println("完成采集工具")
	isOver <- struct{}{}
}

// 计算
func calculateMsg() {
	fmt.Println("开始进行数据分析")
}

Channel 之间传递消息

type token struct{}

func main() {
	num := 4
	var chs []chan token
	// 4 个work
	for i := 0; i < num; i++ {
		chs = append(chs, make(chan token))
	}
	for j := 0; j < num; j++ {
		go worker(j, chs[j], chs[(j+1)%num])
	}
	// 先把令牌交给第一个
	chs[0] <- struct{}{}
	// select {}
	time.Sleep(10 * time.Second)
}

func worker(id int, ch chan token, next chan token) {
	for {
		// 对应work 取得令牌
		token := <-ch
		fmt.Println(id + 1)
		time.Sleep(1 * time.Second)
		// 传递给下一个
		next <- token
	}
}

模拟生产者消费者

func main() {
	ch := make(chan int, 10)
	go consumer(ch)
	time.Sleep(1 * time.Second)
	go producer(ch)
	time.Sleep(3 * time.Second)
}

// 一个生产者
func producer(ch chan int) {
	for i := 0; i < 10; i++ {
		ch <- i
	}
	close(ch)
}

// 消费者
func consumer(task <-chan int) {
	for i := 0; i < 5; i++ {
		// 5个消费者
		go func(id int) {
			for {
				// <-task 阻塞读,如果为 false 说明通道已关闭
				item, ok := <-task
				if !ok {
					return
				}
				fmt.Printf("消费者:%d,消费了:%d\n", id, item)
				// 给别人一点机会不会吃亏
				time.Sleep(50 * time.Millisecond)
			}
		}(i)
	}
}

模拟互斥锁

type ticket struct{}

type Mutex struct {
	ch chan ticket
}

// 创建一个缓冲区为1的通道作
func newMutex() *Mutex {
	return &Mutex{ch: make(chan ticket, 1)}
}

// 谁能往缓冲区为1的通道放入数据,谁就获取了锁
func (m *Mutex) Lock() {
	m.ch <- struct{}{}
}

// 解锁就把数据取出
func (m *Mutex) unLock() {
	select {
	case <-m.ch:
	default:
		panic("已经解锁了")
	}
}

func main1() {
	mutex := newMutex()
	go func() {
		// 如果是1先拿到锁,那么2就要等1秒才能拿到锁
		mutex.Lock()
		fmt.Println("任务1拿到锁了")
		time.Sleep(1 * time.Second)
		mutex.unLock()
	}()
	go func() {
		mutex.Lock()
		// 如果是2拿先到锁,那么1就要等2秒才能拿到锁
		fmt.Println("任务2拿到锁了")
		time.Sleep(2 * time.Second)
		mutex.unLock()
	}()
	time.Sleep(500 * time.Millisecond)
	// 用了一点小手段这里最后才能拿到锁
	mutex.Lock()
	mutex.unLock()
	close(mutex.ch)
}

模拟完备的不可重入锁

type Mutex struct {
	ch chan struct{}
}

// 初始化
func NewMutex() *Mutex {
	mu := &Mutex{ch: make(chan struct{}, 1)}
	mu.ch <- struct{}{}
	return mu
}

// 请求锁

func (m *Mutex) Lock() {
	<-m.ch
}

// 解锁
func (m *Mutex) UnLock() {
	select {
	case m.ch <- struct{}{}:
	default:
		panic("unlock of unlocked mutex")
	}
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
	select {
	case <-m.ch:
		return true
	default:
	}
	return false
}

// 加入一个超时的设置
func (m *Mutex) LockTimeOut(timeOut time.Duration) bool {
	timer := time.NewTicker(timeOut)
	select {
	case <-m.ch:
		timer.Stop()
		return true
	case <-timer.C:
	}
	return false
}

func (m *Mutex) IsLocked() bool {
	return len(m.ch) == 0
}

func main() {
	m := NewMutex()
	ok := m.TryLock()
	fmt.Printf("locked v %v\n", ok)
	ok = m.TryLock()
	fmt.Printf("locked %v\n", ok)
}

Context

通知取消

使用一个 context 实现通知取消功能,被通知到的协程需要读取 context的状态以判断是否徐亚继续执行

func operation1(ctx context.Context) error {
	// 假设这个操作会因为某种原因失败
	// 使用time.Sleep来模拟一个资源密集型操作
	time.Sleep(100 * time.Millisecond)
	return errors.New("failed")
}

func operation2(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			println("canceled operation2")
			return
		// 注意:省略default分支 整个goroutine 会被阻塞住
		default:
		}

		println("operation2 exec ing")
		// 可以选择在default分支或者是这里执行业务逻辑
		time.Sleep(10 * time.Millisecond)
	}
}

func main() {
	// 新建一个上下文
	ctx := context.Background()
	// 在初始上下文的基础上创建一个有取消功能的上下文
	ctx, cancel := context.WithCancel(ctx)
	// 在不同的goroutine中运行operation2
	go func() {
		operation2(ctx)
	}()

	err := operation1(ctx)
	// 如果这个操作返回错误,取消所有使用相同上下文的 goroutine 的执行
	if err != nil {
		cancel()
	}
}

超时通知取消

Context 上下文 · Issue #50 · kevinyan815/gocookbook (github.com)

// 这个上下文将会在3秒后被取消
// 如果需要在到期前就取消可以像前面的例子那样使用cancel函数
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
 
// 上下文将在2009-11-10 23:00:00被取消
ctx, cancel := context.WithDeadline(ctx, time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC))

worker goroutine 内部要跟上面的例子一样,通过 select case <- Done() 接收通道传递过来的取消信号。唯一的区别是这两种Context是到期自动同步信号,不需要在 main goroutine 内主动触发

如果想在到期前提前让 worker goroutine 结束执行,调用创建Context时返回的 cancel 函数。

使用WaitGroup, Channel和Context打造一个并发用户标签查询器

使用WaitGroup, Channel和Context打造一个并发用户标签查询器 · Issue #21 · kevinyan815/gocookbook (github.com)

简单多任务分解

一个把Context,Timer,Goroutine和Channel结合起来的例子 · Issue #18 · kevinyan815/gocookbook (github.com)

用多个goroutine在一个超级长的切片中查找目标整数,限时5秒。在找到目标值或者超时后立刻结束所有goroutine的执行。

func main() {
	timer := time.NewTimer(time.Second * 5)
	data := []int{1, 2, 3, 10, 999, 8, 345, 7, 98, 33, 66, 77, 88, 68, 96}
	dataLen := len(data)
	size := 3
	target := 345
	ctx, cancel := context.WithCancel(context.Background())
	resultChan := make(chan bool)
	for i := 0; i < dataLen; i += size {
		// 划定查询范围
		end := i + size
		if end >= dataLen {
			end = dataLen - 1
		}
		go SearchTarget(ctx, data[i:end], target, resultChan)
	}
	select {
	case <-timer.C:
		fmt.Fprintln(os.Stderr, "TimeOut! Not found")
		cancel()
	case <-resultChan:
		fmt.Fprintln(os.Stdout, "Found It")
		cancel()
	}
	time.Sleep(20)
}

func SearchTarget(ctx context.Context, data []int, target int, resultChan chan bool) {
	for _, v := range data {
		select {
		case <-ctx.Done():
			fmt.Fprintf(os.Stdout, "Task Cancelded~~\n")
			return
		default:
		}
		fmt.Fprintf(os.Stdout, "v: %d\n", v)
		time.Sleep(time.Millisecond * 1500)
		if target == v {
			resultChan <- true
			return
		}
	}
}

如何实现超时之后取消任务?

1、在主协程中,监控 timer 是否超时,如果超时,执行 context 提供的 cancel 函数

select {
case <-timer.C:
	fmt.Fprintln(os.Stderr, "TimeOut! Not found")
	cancel()
case <-resultChan:
	fmt.Fprintln(os.Stdout, "Found It")
	cancel()
}

2、在任务协程中:每次执行下一次查找时查看 context 存在

select {
case <-ctx.Done():
	fmt.Fprintf(os.Stdout, "Task Cancelded~~\n")
	return
default:
}

任务协程是如何向主协程报告找到 target 的?

使用一个 channel