1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| package main
import ( "log" "math/rand" "os" "os/signal" "syscall" "time" )
/* classic pipeline demo write by perrynzhou@gmail.com */ const ( batchSize = 8 )
/* note: for range 在chan上有如下特性 1.如果chan上有数据,则for 继续往下执行,如果chan没有数据则for 会阻塞 2.如果chan被close了,则chan为nil,for range会退出循环。 */ type PipeFeature struct { input1 chan int64 input2 chan int64 input3 chan int64 done chan struct{} stop chan struct{} }
func NewPipeFeature() *PipeFeature { return &PipeFeature{ input1: make(chan int64, batchSize), input2: make(chan int64, batchSize), input3: make(chan int64, batchSize), done: make(chan struct{}), stop: make(chan struct{}), } } func (p *PipeFeature) init() { log.Println("...init running...") defer close(p.input1) for { select { case <-p.done: log.Println("...init stop...") return default: time.Sleep(5 * time.Millisecond) p.input1 <- rand.Int63n(65535) } } } func (p *PipeFeature) stage1() { log.Println("...stage1 running...") defer close(p.input2) for v := range p.input1 { //will block util input1 close v = v - rand.Int63n(1024) p.input2 <- v } log.Println("stage1 done...") } func (p *PipeFeature) stage2() { log.Println("...stage2 running...") defer close(p.input3) for v := range p.input2 { v = v + 1 p.input3 <- v } log.Println("stage2 done...") } func (p *PipeFeature) stage3() { log.Println("...stage3 running...") for v3 := range p.input3 { //will block v3 = v3 + rand.Int63n(100) } log.Println("stage3 done...") } func (p *PipeFeature) Run() { log.Println("start pipeline...") go p.init() //order2- recv data from done and closed input1, return this function go p.stage1() order 3-if input1 is closed,break for loop, and close input2 before return go p.stage2() //order 4-if input2 is closed ,break for range input2 and close input3 before return // order 5- if input3 is closed,stage3 return p.stage3() // will block util input3 closed after call stage2 p.stop <- struct{}{} // order 6-send stop flag to stop chan before end Run function } func (p *PipeFeature) Stop() { p.done <- struct{}{} // order 1-let init function to stop //order 7 - already recv data from stop chan <-p.stop //wait for recv stop chan log.Println("stop pipeline...") } func main() { pipe := NewPipeFeature() defer pipe.Stop() sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) go pipe.Run() for { select { case <-sigs: log.Println("recieve stop signal") return } } }
|