久久国产乱子伦精品免费M,亚洲一区二区三区91,欧美国产在线视频,国产精品视频久久

使用 Asynq 實(shí)現(xiàn) Go 異步任務(wù)處理

1. 介紹

Asynq 是一個(gè) Go 庫,用于對(duì)任務(wù)進(jìn)行排隊(duì)并與工作人員異步處理它們。

它的工作原理:

  • 客戶端將任務(wù)放入隊(duì)列
  • 服務(wù)器從隊(duì)列中拉出任務(wù)并為每個(gè)任務(wù)啟動(dòng)一個(gè)工作 goroutine
  • 多個(gè)工作人員同時(shí)處理任務(wù)

倉庫鏈接:https://github.com/hibiken/asynq)

#?2. 快速開始

?2.1 準(zhǔn)備工作

確保已安裝并運(yùn)行了redis

redis-server

安裝asynq軟件包

go?get?-u?github.com/hibiken/asynq

創(chuàng)建項(xiàng)目asynq_task,目錄結(jié)構(gòu):

使用 Asynq 實(shí)現(xiàn) Go 異步任務(wù)處理

?2.2 Redis連接項(xiàng)

Asynq 使用 Redis 作為消息代理。
client.go 和 main.go 都需要連接到 Redis 進(jìn)行寫入和讀取。
我們將使用 RedisClientOpt 指定如何連接到本地 Redis 實(shí)例。

asynq.RedisClientOpt{
????Addr:?????"127.0.0.1:6379",
????Password:?"",
????DB:???????2,
}

?2.3 Task任務(wù)

type?Task?struct?{
????//?一個(gè)簡單的字符串值,表示要執(zhí)行的任務(wù)的類型.
????typename?string

????//?有效載荷保存執(zhí)行任務(wù)所需的數(shù)據(jù),有效負(fù)載值必須是可序列化的.
????payload?[]byte

????//?保存任務(wù)的選項(xiàng).
????opts?[]Option

????//?任務(wù)的結(jié)果編寫器.
????w?*ResultWriter
}

?2.4 編寫程序

test_delivery.go?: 一個(gè)封裝任務(wù)創(chuàng)建和任務(wù)處理的包

package?test_delivery

import?(
????"context"
????"encoding/json"
????"fmt"
????"github.com/hibiken/asynq"
????"log"
)

const?(
????TypeEmailDelivery?=?"email:deliver"
)

type?EmailDeliveryPayload?struct?{
????UserID?????int
????TemplateID?string
}

func?NewEmailDeliveryTask(userID?int,?tmplID?string)?(*asynq.Task,?error)?{
????payload,?err?:=?json.Marshal(EmailDeliveryPayload{UserID:?userID,?TemplateID:?tmplID})
????if?err?!=?nil?{
????????fmt.Println(err)
????????return?nil,?err
????}
????return?asynq.NewTask(TypeEmailDelivery,?payload),?nil
}

func?HandleEmailDeliveryTask(ctx?context.Context,?t?*asynq.Task)?error?{
????var?p?EmailDeliveryPayload
????if?err?:=?json.Unmarshal(t.Payload(),?&p);?err?!=?nil?{
????????return?fmt.Errorf("json.Unmarshal?failed:?%v:?%w",?err,?asynq.SkipRetry)
????}
????//邏輯處理start...
????log.Printf("Sending?Email?to?User:?user_id=%d,?template_id=%s",?p.UserID,?p.TemplateID)
????return?nil
}

client.go: 在應(yīng)用程序代碼中,導(dǎo)入上述包并用于Client將任務(wù)放入隊(duì)列中。

package?client

import?(
????"asynq_task/test_delivery"
????"github.com/hibiken/asynq"
????"log"
)

func?EmailDeliveryTaskAdd()?{
????client?:=?asynq.NewClient(asynq.RedisClientOpt{
????????Addr:?????"127.0.0.1:6379",
????????Password:?"",
????????DB:???????2,
????})
????defer?client.Close()

????task,?err?:=?test_delivery.NewEmailDeliveryTask(42,?"some:template:id")
????if?err?!=?nil?{
????????log.Fatalf("could?not?create?task:?%v",?err)
????}
????info,?err?:=?client.Enqueue(task)
????if?err?!=?nil?{
????????log.Fatalf("could?not?enqueue?task:?%v",?err)
????}
????log.Printf("enqueued?task:?id=%s?queue=%s",?info.ID,?info.Queue)
}

main.go: ?異步任務(wù)服務(wù)入口文件

接下來,啟動(dòng)一個(gè)工作服務(wù)器以在后臺(tái)處理這些任務(wù)。要啟動(dòng)后臺(tái)工作人員,使用Server并提供您Handler來處理任務(wù)。可以選擇使用ServeMux來創(chuàng)建處理程序,就像使用net/httpHandler 一樣。

package?main

import?(
????"asynq_task/test_delivery"
????"github.com/hibiken/asynq"
????"log"
)

func?main()?{
????srv?:=?asynq.NewServer(
????????asynq.RedisClientOpt{
????????????Addr:?????"127.0.0.1:6379",
????????????Password:?"",
????????????DB:???????2,
????????},
????????asynq.Config{
????????????//?每個(gè)進(jìn)程并發(fā)執(zhí)行的worker數(shù)量
????????????Concurrency:?5,
????????????//?Optionally?specify?multiple?queues?with?different?priority.
????????????Queues:?map[string]int{
????????????????"critical":?6,
????????????????"default":??3,
????????????????"low":??????1,
????????????},
????????????//?See?the?godoc?for?other?configuration?options
????????},
????)

????mux?:=?asynq.NewServeMux()
????mux.HandleFunc(test_delivery.TypeEmailDelivery,?test_delivery.HandleEmailDeliveryTask)

????if?err?:=?srv.Run(mux);?err?!=?nil?{
????????log.Fatalf("could?not?run?server:?%v",?err)
????}
}

Asynq 是一個(gè) Go 庫(https://github.com/hibiken/asynq),用于對(duì)任務(wù)進(jìn)行排隊(duì)并與工作人員異步處理它們。用來分發(fā)異步任務(wù)

package?main

import?(
????"asynq_task/test_delivery/client"
????"time"
)

func?main()?{
????for?i?:=?0;?i?<?3;?i++?{
????????client.EmailDeliveryTaskAdd()
????????time.Sleep(time.Second?*?3)
????}
}

?5. 運(yùn)行查看結(jié)果

  1. 首先,我們要先把異步任務(wù)啟動(dòng)起來準(zhǔn)備好接收,也就是啟動(dòng)cmd/main.go
  2. 啟動(dòng)test.go文件向異步任務(wù)服務(wù)添加任務(wù)隊(duì)列

結(jié)果如下:

go run main.go

使用 Asynq 實(shí)現(xiàn) Go 異步任務(wù)處理

go run test.go

使用 Asynq 實(shí)現(xiàn) Go 異步任務(wù)處理

#?3. 細(xì)節(jié)

?3.1 關(guān)于asynq的優(yōu)雅退出

如果異步服務(wù)突然被暫停,正在執(zhí)行的異步任務(wù)會(huì)push到隊(duì)列中,下次啟動(dòng)的時(shí)候自動(dòng)執(zhí)行。

我們可以將一個(gè)異步任務(wù)中途sleep幾秒,發(fā)送一個(gè)異步任務(wù),任務(wù)沒執(zhí)行完中途停掉任務(wù)測(cè)試出結(jié)果:

使用 Asynq 實(shí)現(xiàn) Go 異步任務(wù)處理

再次啟動(dòng)異步任務(wù)服務(wù),發(fā)現(xiàn)這個(gè)任務(wù)被重新執(zhí)行。

?3.2 client中 client.Enqueue 的使用

1) 立即處理任務(wù)

client.Enqueue(t1,?time.Now())

2)延時(shí)處理任務(wù), 兩小時(shí)后處理

client.Enqueue(t2,?asynq.ProcessIn(time.Now().Add(2?*?time.Hour)))

3) 任務(wù)重試,最大重試次數(shù)為25次。

client.Enqueue(task,?asynq.MaxRetry(5))

4)確保任務(wù)的唯一性

4-1:使用TaskID選項(xiàng):自行生成唯一的任務(wù) ID

_,?err?:=?client.Enqueue(task,?asynq.TaskID("mytaskid"))

//?Second?task?will?fail,?err?is?ErrTaskIDConflict?(assuming?that?the?first?task?didn't?get?processed?yet)
_,?err?=?client.Enqueue(task,?asynq.TaskID("mytaskid"))

4-2:使用Unique選項(xiàng):讓 Asynq 為任務(wù)創(chuàng)建唯一性鎖

err?:=?c.Enqueue(t1,?asynq.Unique(time.Hour))

另外,asynq異步任務(wù)提供了命令行工具和Asynqmon用于監(jiān)控和管理Asynq異步任務(wù)和隊(duì)列。WebUI可以通過傳遞兩個(gè)標(biāo)志來啟用與 Prometheus 的集成。

作者:sweey_lff
原文鏈接:https://huaweicloud.csdn.net/637ef508df016f70ae4ca586.html?

相關(guān)新聞

歷經(jīng)多年發(fā)展,已成為國內(nèi)好評(píng)如潮的Linux云計(jì)算運(yùn)維、SRE、Devops、網(wǎng)絡(luò)安全、云原生、Go、Python開發(fā)專業(yè)人才培訓(xùn)機(jī)構(gòu)!

    1. 主站蜘蛛池模板: 阿勒泰市| 鄢陵县| 正镶白旗| 太康县| 罗源县| 台江县| 盘山县| 开原市| 克什克腾旗| 蕉岭县| 获嘉县| 缙云县| 青铜峡市| 登封市| 香格里拉县| 昌乐县| 古浪县| 阳泉市| 当涂县| 黄冈市| 越西县| 且末县| 长阳| 台北县| 华池县| 台中县| 探索| 刚察县| 合江县| 晋中市| 纳雍县| 资溪县| 怀来县| 黔西县| 林甸县| 临湘市| 青州市| 沐川县| 崇州市| 怀集县| 保靖县|