0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何用Worker pool解決異步任務(wù)的問題

Linux愛好者 ? 來源:blog.xizhibei.me ? 作者:blog.xizhibei.me ? 2022-06-08 14:58 ? 次閱讀

【導(dǎo)讀】本文介紹了 Go 移步任務(wù)隊列的實現(xiàn)。

在一些常見的場景中,如果遇到了某些請求特別耗時間,為了不影響其它用戶的請求以及節(jié)約服務(wù)器資源,我們通常會考慮使用異步任務(wù)隊列去解決,這樣可以快速地處理請求、只返回給用戶任務(wù)創(chuàng)建結(jié)果,等待任務(wù)完成之后,我們再告知用戶任務(wù)的完成情況。

對于 Golang,我們可以通過 Worker pool 異步處理任務(wù),在大多數(shù)情況下,如果不在意數(shù)據(jù)丟失以及服務(wù)器性能足夠,我們就沒有必要考慮別的方案,畢竟這樣實現(xiàn)非常簡單。

接下來我們先來說說如何用 Worker pool 解決異步任務(wù)的問題。

Worker pool

Worker pool,也有叫做 Goroutine pool 的,都是指用 Go channel 以及 Goroutine 來實現(xiàn)的任務(wù)隊列。Go channel 本質(zhì)上就是一個隊列,因此用作任務(wù)隊列是很自然的。

在我們不用 Go channel 的時候,我們也許會使用這樣的方式來處理異步任務(wù):

fori:=0;i100;i++{
gofunc(){
//processjob
}()
}

這樣的方式是不推薦的,因為在請求量到達(dá)一定程度,系統(tǒng)處理不過來的時候,會造成 Goroutine 的爆炸,拖慢整個系統(tǒng)的運(yùn)行,甚至程序的崩潰,數(shù)據(jù)也就完全丟失了。

如果我們用簡單的方式,可以看看接下來的例子:一個發(fā)送者(也叫做生產(chǎn)者),一個接受者(也叫做消費(fèi)者,或者 Worker):

typeJobstruct{...}
jobChan:=make(chanJob)
quit:=make(chanbool)
gofunc(){
select{
casejob:=<-jobChan:
???case<-?quit:
???return
}
}()

fori:=0;i100;i++{
jobChan<-?Job{...}
}
close(jobChan)
quit<-?true

如果 Worker 不夠,我們可以增加,這樣可以并行處理任務(wù):

fori:=0;i10;i++{
gofunc(){
forjob:=rangejobChan{
//processjob
}
}()
}

這樣,一個非常簡單的 Worker pool 就完成了,只是,它對任務(wù)的處理還會有問題,比如無法設(shè)置超時、無法處理 panic 錯誤等。

實際上,目前已經(jīng)有很多的開源庫可以幫你實現(xiàn)了,以worker pool為關(guān)鍵詞在 GitHub 上可以搜到一大堆:

  • GitHub - Jeffail/tunny: A goroutine pool for Go
  • GitHub - gammazero/workerpool: Concurrency limiting goroutine pool

那么,它們的缺點呢?

很明顯,它們的缺點就在于缺乏管理,可以說是完全不管任務(wù)的結(jié)果,即使我們加日志輸出也只是為了簡單監(jiān)控,更要命的就是進(jìn)程重啟的時候,比如進(jìn)程掛了,或者程序更新,都會導(dǎo)致數(shù)據(jù)丟失,畢竟生產(chǎn)者與消費(fèi)者在一個進(jìn)程中的時候,會互相影響(搶占 CPU 與內(nèi)存資源)。因此前面我也說了,在不管這兩個問題的時候,可以考慮用。

如果數(shù)據(jù)很重要(實際上,我認(rèn)為用戶上傳的業(yè)務(wù)數(shù)據(jù)都重要,不能丟失),為了解決這些問題,我們必須換一種解決方案。

分布式異步任務(wù)隊列

接下來再說說異步的分布式任務(wù)隊列,要用到這個工具的時候,我們大致有以下幾個需求:

  • 分布式:生產(chǎn)者與消費(fèi)者隔離;
  • 數(shù)據(jù)持久化:在程序重啟的時候,不丟失已有的數(shù)據(jù);
  • 任務(wù)重試:會有任務(wù)偶然失敗的場景,重試是最簡單的方式,但需要保證任務(wù)的執(zhí)行時是冪等的;
  • 任務(wù)延時:延遲執(zhí)行,比如 5 分鐘后給用戶發(fā)紅包;
  • 任務(wù)結(jié)果的臨時存儲,可用于儲存;
  • 任務(wù)處理情況監(jiān)控:及時發(fā)現(xiàn)任務(wù)執(zhí)行出錯情況;

對于 Python 來說,有個大名鼎鼎的 Celery(https://github.com/celery/celery),它完全包含上面的功能。它包含兩個比較重要的組件:一個是消息隊列,比如 Redis/RabbitMQ 等,Celery中叫做Broker,然后還需要有數(shù)據(jù)庫,用于存儲任務(wù)狀態(tài),叫做Result Backend。

顯然對于 Go 也有很多不錯的開源庫,其中一個學(xué) Celery 的是 Machinery(github.com/RichardKnop/machinery),它目前能滿足大部分需求,而且一直在積極維護(hù),也是我們團(tuán)隊目前在用的。

它目前支持的 Broker 有 AMQP(RabbitMQ)、Redis、AWS SQS、GCP Pub/Sub,目前對國內(nèi)同行來說,RabbitMQ 或者 Redis 會相對比較合適。

另外它還支持幾個高級用法:

  1. Groups:允許你定義多個并行的任務(wù),在最后取任務(wù)結(jié)果的時候,可以一起返回;
  2. Chords:允許你定義一個回調(diào)任務(wù),在 Group 任務(wù)執(zhí)行完畢后執(zhí)行對應(yīng)的回調(diào)任務(wù);
  3. Chains:允許你定義串行執(zhí)行的任務(wù),任務(wù)將會被串行執(zhí)行;

說了優(yōu)點,再說說它的缺點:

  1. 任務(wù)監(jiān)控支持不夠,目前只有分布式追蹤 opentracing 的支持,假如我要使用 prometheus,會比較困難,它的自定義錯誤處理過于簡單,連上下文都不給你;
  2. 傳入的參數(shù)目前只支持非常簡單的參數(shù),不支持 struct、map,還得定義參數(shù)的類型,這樣的方式會將這個庫限制在 Golang 世界中,而無法拓展適用于其它語言;

P.S.

其實對于 Goroutine 的方案,在以下兩種情況下,可以考慮使用:

  1. 必須同步返回給用戶請求結(jié)果;
  2. 服務(wù)器資源足夠,僅僅用 Worker pool 就能降低請求的響應(yīng)時長到可接受范圍;

這兩種方案都會返回請求結(jié)果,失敗的情況下靠客戶端重新請求來解決數(shù)據(jù)丟失的問題。

原文標(biāo)題:Golang 中的異步任務(wù)隊列

文章出處:【微信公眾號:Linux愛好者】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

審核編輯:湯梓紅
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • Channel
    +關(guān)注

    關(guān)注

    0

    文章

    31

    瀏覽量

    11749
  • 異步
    +關(guān)注

    關(guān)注

    0

    文章

    62

    瀏覽量

    18024
  • Worker
    +關(guān)注

    關(guān)注

    0

    文章

    8

    瀏覽量

    6448

原文標(biāo)題:Golang 中的異步任務(wù)隊列

文章出處:【微信號:LinuxHub,微信公眾號:Linux愛好者】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    Spring Boot如何實現(xiàn)異步任務(wù)

    Spring Boot 提供了多種方式來實現(xiàn)異步任務(wù),這里介紹三種主要實現(xiàn)方式。 1、基于注解 @Async @Async 注解是 Spring 提供的一種輕量級異步方法實現(xiàn)方式,它可以標(biāo)記在方法上
    的頭像 發(fā)表于 09-30 10:32 ?1322次閱讀

    鴻蒙原生應(yīng)用開發(fā)-ArkTS語言基礎(chǔ)類庫多線程TaskPool和Worker的對比(一)

    TaskPool(任務(wù)池)和Worker的作用是為應(yīng)用程序提供一個多線程的運(yùn)行環(huán)境,用于處理耗時的計算任務(wù)或其他密集型任務(wù)??梢杂行У乇苊膺@些任務(wù)
    發(fā)表于 03-25 14:11

    鴻蒙原生應(yīng)用開發(fā)-ArkTS語言基礎(chǔ)類庫多線程TaskPool和Worker的對比(二)

    易用,支持任務(wù)的執(zhí)行、取消。工作線程數(shù)量上限為4。 Worker運(yùn)作機(jī)制 圖2 Worker運(yùn)作機(jī)制示意圖 創(chuàng)建Worker的線程稱為宿主線程(不一定是主線程,工作線程也支持創(chuàng)建
    發(fā)表于 03-26 15:25

    鴻蒙原生應(yīng)用開發(fā)-ArkTS語言基礎(chǔ)類庫多線程TaskPool和Worker的對比(三)

    一、TaskPool注意事項 實現(xiàn)任務(wù)的函數(shù)需要使用裝飾器@Concurrent標(biāo)注,且僅支持在.ets文件中使用。 實現(xiàn)任務(wù)的函數(shù)入?yún)⑿铦M足序列化支持的類型。 由于不同線程中上下文對象
    發(fā)表于 03-27 16:26

    HarmonyOS CPU與I/O密集型任務(wù)開發(fā)指導(dǎo)

    。 基于多線程并發(fā)機(jī)制處理CPU密集型任務(wù)可以提高CPU利用率,提升應(yīng)用程序響應(yīng)速度。 當(dāng)進(jìn)行一系列同步任務(wù)時,推薦使用Worker;而進(jìn)行大量或調(diào)度點較為分散的獨(dú)立任務(wù)時,不方便使用
    發(fā)表于 09-26 16:29

    何用VxWorks的信號量機(jī)制實現(xiàn)任務(wù)同步

    何用VxWorks的信號量機(jī)制實現(xiàn)任務(wù)同步
    發(fā)表于 03-29 12:25 ?16次下載

    面向?qū)ο笄度胧綄崟r操作系統(tǒng)Worker1.0

    Worker1.0繼承圖1 Worker1.0主要類的簡介3 Worker1.0 API4 Worker1.0移植9 Worker1.0例程
    發(fā)表于 04-29 18:01 ?39次下載
    面向?qū)ο笄度胧綄崟r操作系統(tǒng)<b class='flag-5'>Worker</b>1.0

    Android異步任務(wù)處理

    移動護(hù)理系統(tǒng)開發(fā)采用異步處理的方式,可以縮短執(zhí)行操作的時間,避免UI線程阻塞。筆者介紹了采用異步處理方式開發(fā)移動護(hù)理程序的方法,并以移動護(hù)理中的病人列表異步任務(wù)處理為
    發(fā)表于 12-30 10:39 ?3680次閱讀

    詳解移動通信領(lǐng)域里的組POOL

    在移動通信領(lǐng)域,我們經(jīng)常會提到Pool的概念。Pool,通常譯為水塘、水池。在移動通信中POOL通稱為“池”
    的頭像 發(fā)表于 03-19 16:15 ?7779次閱讀
    詳解移動通信領(lǐng)域里的組<b class='flag-5'>POOL</b>

    normal worker_pool詳細(xì)的創(chuàng)建過程代碼分析

    默認(rèn) work 是在 normal worker_pool 中處理的。系統(tǒng)的規(guī)劃是每個 CPU 創(chuàng)建兩個 normal worker_pool:一個 normal 優(yōu)先級 (nice=0)、一個高
    的頭像 發(fā)表于 04-08 14:35 ?7218次閱讀
    normal <b class='flag-5'>worker_pool</b>詳細(xì)的創(chuàng)建過程代碼分析

    為何需要CMWQ?CMWQ如何解決問題的呢?

    基于這樣的思考,在CMWQ中,將這種固定的關(guān)系被打破,提出了worker pool這樣的概念(其實就是一種thread pool的概念),也就是說,系統(tǒng)中存在若干worker
    的頭像 發(fā)表于 08-20 14:47 ?5279次閱讀

    介紹一種基于任務(wù)異步模式TAP

    TAP是基于任務(wù)異步模式,在 .NET Framework 4 中引入。TAP是 APM 和 EAP,是推薦的模式模式。
    的頭像 發(fā)表于 08-19 11:45 ?2551次閱讀

    ModBus Pool下載

    ModBus Pool下載
    發(fā)表于 10-08 09:41 ?6次下載

    Tokio中hang死所有worker的方法

    原因是 tokio 里的待執(zhí)行 task 不是簡單的放到一個 queue 里,除了 runtime 內(nèi)共享的,可被每個 worker 消費(fèi)的run_queue[2],每個 worker 還有一個自己的 lifo_slot[3],只存儲一個最后被放入的 task (目的是減
    的頭像 發(fā)表于 02-03 16:26 ?925次閱讀

    鴻蒙語言基礎(chǔ)類庫:ohos.worker 啟動一個Worker

    Worker是與主線程并行的獨(dú)立線程。創(chuàng)建Worker的線程稱之為宿主線程,Worker自身的線程稱之為Worker線程。創(chuàng)建Worker
    的頭像 發(fā)表于 07-11 17:03 ?321次閱讀
    鴻蒙語言基礎(chǔ)類庫:ohos.<b class='flag-5'>worker</b> 啟動一個<b class='flag-5'>Worker</b>