在前面有介紹過了 golang 的重點之一 goroutine ,golang 的設計讓 concurrency 非常容易,但是大家有沒有想過,goroutine 也不是什麼黑科技,最終電腦還是有所謂的 CPU & MEM 上限,所以也就意味著 goroutine 不能無窮無境的開,那今天就來討論 Concurrency Pattern,到底該如何設計。

接下來,我們假設一個情境,我們需要在某個 http handler ,加入統計 request 相關的方法,但是又不希望它影響這個 api response 的速度,所以我們一定是要用 goroutine 把統計的相關 func ,丟到背景平行處理。就像下面一樣


//模擬需要統計 reqesut 相關數據的
func Statistic(r *http.Request) {
}

func main() {

    http.HandleFunc("/api/query", func(w http.ResponseWriter, r *http.Request) {

        //假設需要統計 request 相關數據,所以丟背景
        go Statistic(r)

        u := &UserInfo{
            Name: "syhlion",
            Age:  18,
        }
        b, err := json.Marshal(u)
        if err != nil {
            log.Println(err)
            return
        }
        w.Header().Set("Content-Type", "application/json;charset=UTF-8")
        w.WriteHeader(http.StatusOK)
        w.Write(b)
    })

    log.Fatal(http.ListenAndServe(":8080", nil))
}

但是如果照上面所寫的,如果 http request 一多,是不是 goroutine 就無窮無境的開下去了….,那這樣就在比到底是先碰上 port 不夠的問題,還是 CPU & MEM 先被耗盡。最終就是被 OS 層強制關閉 application。

在這方面上的設計,我個人是有一個原則,『當存取的人數多時,服務可以慢,但不能死』,所以當我們存取人數多時,寧可少收幾個客戶,也不能硬要多收幾個客戶,讓整個系統崩潰。

接下來我們來構思一下改良策略,首先我們需要一個 worker 派發中心。這時侯就能善用 golang 的特性 channel,運用 channel 當作 quene ,在當配 goroutine,就能做出一個派工的模型。

concurrency-pattern

依據上圖,我們的實作方式如下

type StatisticWorker struct {
	quene chan *http.Request
	num   int
}

//統一由這邊做 requset 加入的動作
func (s *StatisticWorker) AddRequest(r *http.Request) (err error) {
	select {
	case s.quene <- r:
	//當quene已滿時,則把 err 丟出,
	default:
		err = errors.New("buffer full")
	}
	return

}
//實際worker本體 運作邏輯的地方
func (s *StatisticWorker) Statistic() {
	for r := range s.quene {
		//處理 request 相關統計
	}
}

//初始化到底要有幾個worker
func (s *StatisticWorker) Run() {
	for i := 0; i < s.num; i++ {
		go Statistic()
	}
}

這樣子我們就能看到一個完整 worker,可以依你需求去控制最大收 request 的統計 worker。

完整的使用範例如下

type StatisticWorker struct {
	quene chan *http.Request
	num   int
}

//統一由這邊做 requset 加入的動作
func (s *StatisticWorker) AddRequest(r *http.Request) (err error) {
	select {
	case s.quene <- r:
	//當quene已滿時,則把 err 丟出,
	default:
		err = errors.New("buffer full")
	}
	return

}
//實際worker本體 運作邏輯的地方
func (s *StatisticWorker) Statistic() {
	for r := range s.quene {
		//處理 request 相關統計
	}
}

//初始化到底要有幾個worker
func (s *StatisticWorker) Run() {
	for i := 0; i < s.num; i++ {
		go Statistic()
	}
}
func main() {
    sw := &StatisticWorker{
		quene: make(chan *http.Request, 1000),
		num:   100,
	}
	go sw.Run()

    http.HandleFunc("/api/query", func(w http.ResponseWriter, r *http.Request) {

        //假設需要統計 request 相關數據,所以丟背景
        //這邊使用這個非阻塞式的worker
        err := sw.Add(r)
        if err != nil {
            log.Println(err)
            return
        }

        u := &UserInfo{
            Name: "syhlion",
            Age:  18,
        }
        b, err := json.Marshal(u)
        if err != nil {
            log.Println(err)
            return
        }
        w.Header().Set("Content-Type", "application/json;charset=UTF-8")
        w.WriteHeader(http.StatusOK)
        w.Write(b)
    })

    log.Fatal(http.ListenAndServe(":8080", nil))
}