go web+RabbitMQ实战速学

2023-09-28 17:00:25 0点赞 2收藏 0评论
go web+RabbitMQ实战速学

1. 为啥要用MQ

为啥使用,因为他很牛逼

2.使用docker部署单机RabbitMQ、go客户端库

docker 镜像

https://hub.docker.com/_/rabbitmq

docker pull rabbitmq:3.8.10-management-alpine

说明:management代表是带管理后台

启动容器

docker run -d --name my-rmq -e RABBITMQ_DEFAULT_USER=linzl -e RABBITMQ_DEFAULT_PASS=123 -p 8081:15672 -p 5672:5672 rabbitmq:3.8.10-management-alpine

rabbitmq golang 客户端库

https://github.com/streadway/amqp

go get -u github.com/streadway/amqp

测试golang 链接 mq

package main import ( "fmt" "github.com/streadway/amqp" "log" ) func main() { dsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672) connection, err := amqp.Dial(dsn) if err != nil { log.Fatal(err) } defer connection.Close() fmt.Println(connection) }

  1. 生产者创建channel发送消息给Exchange

  2. Exchange(有多种交换机)根据策略binding队列进行消息投递

  3. 队列具有推/拉模式

  4. 消费者使用channel获取消息,并确认接收或拒绝,重新入列给别的消费者

3. 用最简单的方式:生产者发送第一条消息

package main import ( "fmt" "github.com/streadway/amqp" "log" ) func main() { connection, err := amqp.Dial("amqp://linzl:123@192.168.1.6:5672") if err != nil { log.Fatal(err) } defer connection.Close() // 获取channel 连接 channelConn, err := connection.Channel() if err != nil { log.Fatal(err) } defer channelConn.Close() // 创建队列 queue, err := channelConn.QueueDeclare( "test_queue", false, false, false, false, nil, ) if err != nil { log.Fatal(err) } // 发生消息 err = channelConn.Publish( "", queue.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(`test002`), }, ) if err != nil { log.Fatal(err) } fmt.Println("简单生产一条消息成功") }

4.用最简单的方式:消费者读取消息

对连接mq 简单封装

package AppInit import ( "fmt" "github.com/streadway/amqp" "log" ) var ( err error MQConn *amqp.Connection ) func init() { dsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672) MQConn, err = amqp.Dial(dsn) if err != nil { log.Fatal(err) } log.Println(MQConn.Major) } func GetMQ()*amqp.Connection { return MQConn }

消费者

package main import ( "fmt" "github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit" "log" ) func main() { mq := AppInit.GetMQ() defer mq.Close() channelConn, err := mq.Channel() if err != nil { log.Fatal(err) } defer channelConn.Close() // 消费 msgs, err := channelConn.Consume( "test_queue", "Consumer01", false, false, false, false, nil, ) if err != nil { log.Fatal(err) } for msg := range msgs { msg.Ack(false) // 确认 fmt.Println(msg.DeliveryTag,string(msg.Body)) } }

5.简单API过程、注册流程、MQ操作简单封装

案例用户注册

简单的API过程、注册流程、MQ操作简单封装

gin 框架构建用户注册api

userModel

package User type UserModel struct { UserID int64 `json:"user_id"` UserName string `json:"user_name"` } func NewUserModel() *UserModel { return &UserModel{} }

user api

package main import ( "encoding/json" "github.com/gin-gonic/gin" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User" "net/http" "time" ) func main() { engine := gin.New() engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) { user := User.NewUserModel() err := ctx.ShouldBindJSON(user) if err != nil { ctx.JSON(400,err.Error()) return } user.UserID =time.Now().Unix() // 模拟用户注册入库 if user.UserID > 0 { // 假设入库成功 bytes, _ := json.Marshal(user) Lib.NewMQ().SendMessage(Lib.QUEUE_NEWUSER,string(bytes)) } ctx.JSON(200,user) }) engine.Run(":6060") }

mq 操作简单封装

package Lib import ( "github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit" "github.com/streadway/amqp" "log" ) const ( // 用户注册队列名称 QUEUE_NEWUSER = "newuser" ) type MQ struct { Channel *amqp.Channel } func NewMQ() *MQ { channel, err := AppInit.GetMQ().Channel() if err != nil { log.Println(err) } return &MQ{Channel: channel} } // SendMessage 发生消息到mq func (this *MQ) SendMessage(queueName string, message string) error { _, err := this.Channel.QueueDeclare(queueName, false, false, false, false, nil) if err != nil { return err } return this.Channel.Publish("", queueName, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) }

6.定义交换机:向2个队列同时发送消息(QueueBind)

Exchange

Direct Exchange 也叫做直接模式交换机。交换机和和一个队列绑定起来,并指定路由键, 交换机会寻找匹配的路由键的绑定,并将消息路由给对应的队列

package Lib import ( "github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit" "github.com/streadway/amqp" "log" ) const ( // 用户注册队列名称 QUEUE_NEWUSER = "newuser" ) type MQ struct { Channel *amqp.Channel } func NewMQ() *MQ { channel, err := AppInit.GetMQ().Channel() if err != nil { log.Println(err) } return &MQ{Channel: channel} } // SendMessage 发生消息到mq func (this *MQ) SendMessage(queueName string, message string) error { queue1, err := this.Channel.QueueDeclare(queueName, false, false, false, false, nil) if err != nil { return err } // 假设是其他业务方用的队列 queue2, err := this.Channel.QueueDeclare(queueName+"other", false, false, false, false, nil) if err != nil { return err } // 声明一个交换机 err = this.Channel.ExchangeDeclare("UserExchange", "direct", false, false, false, false, nil) if err != nil { return err } // 队列1与交换机绑定 err = this.Channel.QueueBind(queue1.Name, "UserReg", "UserExchange", false, nil) if err != nil { return err } // 队列2与交换机绑定 err = this.Channel.QueueBind(queue2.Name, "UserReg", "UserExchange", false, nil) if err != nil { return err } return this.Channel.Publish("UserExchange", "UserReg", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) }

7.整理和调整代码结构、初始化队列等

初始化队列

go-rabbitmq/Lib/QueueInit.go

package Lib import "fmt" // UserQueueInit 用户队列初始化.. func UserQueueInit() error{ mq := NewMQ() if mq == nil { return fmt.Errorf("mq init err") } defer mq.Channel.Close() // 声明交换机 err := mq.Channel.ExchangeDeclare(USER_EXCHANGE, "direct", false, false, false, false, nil) if err != nil { return fmt.Errorf("Exchange error:%s",err.Error()) } // 声明队列及绑定 queues := fmt.Sprintf("%s,%s",QUEUE_NEWUSER,QUEUE_NEWUSER_OTHER01) err = mq.DecQueuueAndBind(queues, USER_EXCHANGE, USER_REG_ROUTER_KEY) if err != nil { return fmt.Errorf("DecQueuueAndBind error:%s",err.Error()) } return nil }

SendMessage改造

package Lib import ( "github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit" "github.com/streadway/amqp" "log" "strings" ) const ( // 用户注册队列名称 QUEUE_NEWUSER = "newuser" // 其他业务的新用户队列 QUEUE_NEWUSER_OTHER01 = "newuser-other01" // 用户业务交换机 USER_EXCHANGE = "exchange-user" // 用户注册路由key USER_REG_ROUTER_KEY = "router-key-userreg" ) type MQ struct { Channel *amqp.Channel } func NewMQ() *MQ { channel, err := AppInit.GetMQ().Channel() if err != nil { log.Println(err) } return &MQ{Channel: channel} } // DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开 func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error { queueList := strings.Split(queues,",") for _,queue := range queueList { // 声明队列 q, err := this.Channel.QueueDeclare(queue, false, false, false, false, nil) if err != nil { return err } // 绑定交换机和路由key err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil) if err != nil { return err } } return nil } // SendMessage 发生消息到mq func (this *MQ) SendMessage(key string, exchange string,message string) error { return this.Channel.Publish(exchange, key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) }

拉起gin框架时初始化队列

package main import ( "context" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { errchan := make(chan error) engine := gin.Default() engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) { user := User.NewUserModel() err := ctx.ShouldBindJSON(user) if err != nil { ctx.JSON(400,err.Error()) return } user.UserID =time.Now().Unix() // 模拟用户注册入库 if user.UserID > 0 { // 假设入库成功 bytes, _ := json.Marshal(user) mq := Lib.NewMQ() err := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes)) if err != nil { log.Println(err) errchan<-err } defer mq.Channel.Close() } ctx.JSON(200,user) }) server := http.Server{ Addr: ":6060", Handler: engine, } go func() { err := server.ListenAndServe() if err != nil { errchan<-err } }() go func() { err := Lib.UserQueueInit() if err !=nil { errchan<-err } }() go func() { c := make(chan os.Signal) signal.Notify(c,syscall.SIGINT,syscall.SIGUSR2,syscall.SIGTERM,syscall.SIGKILL) errchan<-fmt.Errorf("%s",<-c) }() errMsg := <- errchan if errMsg != nil { server.Shutdown(context.Background()) log.Println("退出了") } }

8.客户端消费注册用户消息、确认消息

模拟消费

  1. 接收消息

  2. 模拟发生邮件

  3. ack 确认

消费

package main import ( "encoding/json" "fmt" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User" "github.com/streadway/amqp" "time" ) func SendMail(msgs <-chan amqp.Delivery) { for msg := range msgs { userModel := &User.UserModel{} json.Unmarshal(msg.Body, userModel) fmt.Printf("向userid=%d的用户发生邮件n",userModel.UserID) time.time.Second) // 应答一下 msg.Ack(false) } } func main() { mq := Lib.NewMQ() mq.Consume(Lib.QUEUE_NEWUSER,"c1",SendMail) defer mq.Channel.Close() }

MQ.go 新增Consume方法

package Lib import ( "github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit" "github.com/streadway/amqp" "log" "strings" ) const ( // 用户注册队列名称 QUEUE_NEWUSER = "newuser" // 其他业务的新用户队列 QUEUE_NEWUSER_OTHER01 = "newuser-other01" // 用户业务交换机 USER_EXCHANGE = "exchange-user" // 用户注册路由key USER_REG_ROUTER_KEY = "router-key-userreg" ) type MQ struct { Channel *amqp.Channel } func NewMQ() *MQ { channel, err := AppInit.GetMQ().Channel() if err != nil { log.Println(err) } return &MQ{Channel: channel} } // DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开 func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error { queueList := strings.Split(queues,",") for _,queue := range queueList { // 声明队列 q, err := this.Channel.QueueDeclare(queue, false, false, false, false, nil) if err != nil { return err } // 绑定交换机和路由key err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil) if err != nil { return err } } return nil } // SendMessage 发生消息到mq func (this *MQ) SendMessage(key string, exchange string,message string) error { return this.Channel.Publish(exchange, key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } func (this *MQ)Consume(queueName string, key string,callback func(<-chan amqp.Delivery )) { msgs, err := this.Channel.Consume(queueName, key, false, false, false, false, nil) if err != nil { log.Fatal(err) } callback(msgs) }

9. 多消费者消费消息、重新入列

消费者改造支持多消费者

package main import ( "encoding/json" "flag" "fmt" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User" "github.com/streadway/amqp" "log" "time" ) func SendMail(msgs <-chan amqp.Delivery,c string) { for msg := range msgs { userModel := &User.UserModel{} json.Unmarshal(msg.Body, userModel) fmt.Printf("消费者:%s,向userid=%d的用户发生邮件n",c,userModel.UserID) time.time.Second) if c == "c1" { // 模拟c1 出问题,没有ack //msg.Reject(false) // 拒绝消息,false不重新进入队列,此时消息会被丢弃 msg.Reject(true) // 拒绝消息,true重新进入队列,此时消息不会被丢弃 continue } // 应答一下 msg.Ack(false) } } func main() { cname := flag.String("c", "", "消费者名称") flag.Parse() if *cname == "" { log.Fatal("c 必须指定") } mq := Lib.NewMQ() mq.Consume(Lib.QUEUE_NEWUSER,*cname,SendMail) defer mq.Channel.Close() }

10.消费者限流:ACK后再收新消息

代码改造一下,使用协程消费

package main import ( "encoding/json" "flag" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User" "github.com/streadway/amqp" "log" "time" ) func Send(c string, msg amqp.Delivery) error { time.time.Second * 3) userModel := &User.UserModel{} json.Unmarshal(msg.Body, userModel) log.Printf("消费者:%s,向userid=%d的用户发生邮件n",c,userModel.UserID) msg.Ack(false) return nil } func SendMail(msgs <-chan amqp.Delivery,c string) { for msg := range msgs { log.Println("收到消息",string(msg.Body)) // 起协程发送 go Send(c, msg) } } func main() { cname := flag.String("c", "", "消费者名称") flag.Parse() if *cname == "" { log.Fatal("c 必须指定") } mq := Lib.NewMQ() // 消费者限流 err := mq.Channel.Qos(2, 0, false) if err != nil { log.Fatal(err) } mq.Consume(Lib.QUEUE_NEWUSER,*cname,SendMail) defer mq.Channel.Close() }

消费者限流

mq.Channel.Qos(2, 0, false)  第一个参数prefetchCount 可以限制,当接受prefetchCount 条消息后

只有ack 之后可以继续接收消费下一条消息,起到保护消费者的作用

mq.Channel.Qos(2, 0, false)

11. 开启模式、记录失败的消息

当生产消息时,由于mq 的网络问题或是其他问题,可能出现发送失败的情况

当有些敏感信息又不能失败,需要确保每一条消息都发送成功

因此mq 有一个机制就是可以开发 模式,当给mq发送消息时,如果成功会有一个ack 回执

ack 成功说明发送消息成功,ack 失败时需要记录日志(写到mysql 或redis)什么的进行重发

第一步

  1. 开启模式

// SetConfirm 设置模式 func (this *MQ)Setconfirm() { err := this.Channel.confirm(false) if err != nil { log.Println(err) } }

  1. 在MQ 的结构体添加属性

type MQ struct { Channel *amqp.Channel notifyConfirm chan amqp.Confirmation }

// SetConfirm 设置模式 func (this *MQ)Setconfirm() { err := this.Channel.confirm(false) if err != nil { log.Println(err) } this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation)) }

发生消息后,当服务器确认此chan 会有数据传输过来

发生消息时调用SetConfirm 开启 模式 完整代码

MQ.go

package Lib import ( "github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit" "github.com/streadway/amqp" "log" "strings" ) const ( // 用户注册队列名称 QUEUE_NEWUSER = "newuser" // 其他业务的新用户队列 QUEUE_NEWUSER_OTHER01 = "newuser-other01" // 用户业务交换机 USER_EXCHANGE = "exchange-user" // 用户注册路由key USER_REG_ROUTER_KEY = "router-key-userreg" ) type MQ struct { Channel *amqp.Channel notifyConfirm chan amqp.Confirmation } func NewMQ() *MQ { channel, err := AppInit.GetMQ().Channel() if err != nil { log.Println(err) } return &MQ{Channel: channel} } // DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开 func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error { queueList := strings.Split(queues,",") for _,queue := range queueList { // 声明队列 q, err := this.Channel.QueueDeclare(queue, false, false, false, false, nil) if err != nil { return err } // 绑定交换机和路由key err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil) if err != nil { return err } } return nil } // SetConfirm 设置模式 func (this *MQ)Setconfirm() { err := this.Channel.confirm(false) if err != nil { log.Println(err) } this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation)) go this.Listenconfirm() } // ListenConfirm 监听消息 func (this *MQ)Listenconfirm() { defer this.Channel.Close() ret := <-this.notifyConfirm if ret.Ack { log.Println("消息发送成功") } else { log.Println("消息发送失败") //TODO 记录日志,其他程序重发消息 } } // SendMessage 发生消息到mq func (this *MQ) SendMessage(key string, exchange string,message string) error { return this.Channel.Publish(exchange, key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } func (this *MQ)Consume(queueName string, key string,callback func(messages <-chan amqp.Delivery,cname string )) { msgs, err := this.Channel.Consume(queueName, key, false, false, false, false, nil) if err != nil { log.Fatal(err) } callback(msgs,key) }

生产者

package main import ( "context" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { errchan := make(chan error) engine := gin.Default() engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) { user := User.NewUserModel() err := ctx.ShouldBindJSON(user) if err != nil { ctx.JSON(400,err.Error()) return } user.UserID =time.Now().Unix() // 模拟用户注册入库 if user.UserID > 0 { // 假设入库成功 bytes, _ := json.Marshal(user) mq := Lib.NewMQ() // 开启 模式 mq.Setconfirm() err := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes)) if err != nil { log.Println(err) errchan<-err } //defer mq.Channel.Close() } ctx.JSON(200,user) }) server := http.Server{ Addr: ":6060", Handler: engine, } go func() { err := server.ListenAndServe() if err != nil { errchan<-err } }() go func() { err := Lib.UserQueueInit() if err !=nil { errchan<-err } }() go func() { c := make(chan os.Signal) signal.Notify(c,syscall.SIGINT,syscall.SIGUSR2,syscall.SIGTERM,syscall.SIGKILL) errchan<-fmt.Errorf("%s",<-c) }() errMsg := <- errchan if errMsg != nil { server.Shutdown(context.Background()) log.Println("退出了") } }

12.监听消息入列回执:NotifyReturn的用法

mandatory参数

如果为true,在exchange正常且可以到达的情况下。

如果exchange+routeKey 无法投递给queue,那么MQ会将消息还给生产者

如果为false,则直接丢弃

模拟无法投递到exchange+routeKey 通过rabbitmq 管理后台,手动解绑(写多个队列的需要全部解绑)

package Lib import ( "github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit" "github.com/streadway/amqp" "log" "strings" ) const ( // 用户注册队列名称 QUEUE_NEWUSER = "newuser" // 其他业务的新用户队列 QUEUE_NEWUSER_OTHER01 = "newuser-other01" // 用户业务交换机 USER_EXCHANGE = "exchange-user" // 用户注册路由key USER_REG_ROUTER_KEY = "router-key-userreg" ) type MQ struct { Channel *amqp.Channel notifyConfirm chan amqp.Confirmation // NotifyReturn的用法 notifyReturn chan amqp.Return } func NewMQ() *MQ { channel, err := AppInit.GetMQ().Channel() if err != nil { log.Println(err) } return &MQ{Channel: channel} } // DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开 func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error { queueList := strings.Split(queues,",") for _,queue := range queueList { // 声明队列 q, err := this.Channel.QueueDeclare(queue, false, false, false, false, nil) if err != nil { return err } // 绑定交换机和路由key err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil) if err != nil { return err } } return nil } // SetConfirm 设置模式 func (this *MQ)Setconfirm() { err := this.Channel.confirm(false) if err != nil { log.Println(err) } this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation)) go this.Listenconfirm() } // ListenConfirm 监听消息 func (this *MQ)Listenconfirm() { defer this.Channel.Close() ret := <-this.notifyConfirm if ret.Ack { log.Println(":消息发送成功") } else { log.Println(":消息发送失败") //TODO 记录日志,其他程序重发消息 } } func (this *MQ)NotifyReturn() { this.notifyReturn = this.Channel.NotifyReturn(make(chan amqp.Return)) go this.ListenNotifyReturn() } // 需要使用协程 func (this *MQ)ListenNotifyReturn() { ret := <- this.notifyReturn if string(ret.Body) != "" { log.Println("消息没有成功写入队列", string(ret.Body)) } log.Println("ListenNotifyReturn",ret.Headers) } // SendMessage 发生消息到mq func (this *MQ) SendMessage(key string, exchange string,message string) error { return this.Channel.Publish(exchange, key, true, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } func (this *MQ)Consume(queueName string, key string,callback func(messages <-chan amqp.Delivery,cname string )) { msgs, err := this.Channel.Consume(queueName, key, false, false, false, false, nil) if err != nil { log.Fatal(err) } callback(msgs,key) }

SendMessage 之前需要 NotifyReturn

package main import ( "context" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { errchan := make(chan error) engine := gin.Default() engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) { user := User.NewUserModel() err := ctx.ShouldBindJSON(user) if err != nil { ctx.JSON(400,err.Error()) return } user.UserID =time.Now().Unix() // 模拟用户注册入库 if user.UserID > 0 { // 假设入库成功 bytes, _ := json.Marshal(user) mq := Lib.NewMQ() // 开启 模式 mq.Setconfirm() // 监听return 需要在发送消息之前 mq.NotifyReturn() err := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes)) if err != nil { log.Println(err) errchan<-err } //defer mq.Channel.Close() } ctx.JSON(200,user) }) server := http.Server{ Addr: ":6060", Handler: engine, } go func() { err := server.ListenAndServe() if err != nil { errchan<-err } }() go func() { err := Lib.UserQueueInit() if err !=nil { errchan<-err } }() go func() { c := make(chan os.Signal) signal.Notify(c,syscall.SIGINT,syscall.SIGUSR2,syscall.SIGTERM,syscall.SIGKILL) errchan<-fmt.Errorf("%s",<-c) }() errMsg := <- errchan if errMsg != nil { server.Shutdown(context.Background()) log.Println("退出了") } }

13. 以用户注册为例产生的事务需求、延迟队列使用

基本实现

  1. 生产者注册成功之后发生消息

  2. 消息者接受消息后,调用邮件服务

  3. 调用失败。重新入列(要加个延迟时间,失败次数越多,延迟时间越长)

  4. 超过最大重试次数。就不发邮件了

安装插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

这是一个延迟交换机插件。省去我们自己写规则的麻烦

由于我们使用的是3.8.10。因此使用3.8.10对应的插件

拷贝plugins中,容器对应的目录是/opt/rabbitmq/plugins

docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez my-rmq:/opt/rabbitmq/plugins

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

延迟队列使用

官方文档:

// ... elided code ... Map args = new HashMap(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... more code ...

// ... elided code ... byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map headers = new HashMap(); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes); byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8"); Map headers2 = new HashMap(); headers2.put("x-delay", 1000); AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2); channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2); // ... more code ...

因此go 代码改动

定义交互机的kind 应该使用x-delayed-message,args 用map[string]interface{}{"x-delayed-type":"direct"}

// UserDelayInit 创建用户延迟交换机 func UserDelayInit() error { mq := NewMQ() if mq == nil { return fmt.Errorf("UserDelayInit init error") } defer mq.Channel.Close() // 声明交换机 err := mq.Channel.ExchangeDeclare(USER_EXCHANGE_DELAY, "x-delayed-message", false, false, false, false, map[string]interface{}{"x-delayed-type":"direct"}) if err != nil { return fmt.Errorf("UserDelayInit ExchangeDeclare error") } // 声明队列名称及绑定 queues := fmt.Sprintf("%s",QUEUE_NEWUSER) err = mq.DecQueuueAndBind(queues, USER_EXCHANGE_DELAY, USER_REG_ROUTER_KEY) if err != nil { return fmt.Errorf("DecQueuueAndBind error:%s",err.Error()) } return nil }

发送消息时需要设置Headers: map[string]interface{}{"x-delay":delay}, // 单位毫秒

// SendDelayMessage 发生延迟消息到mq // delay 单位是ms func (this *MQ) SendDelayMessage(key string, exchange string,message string,delay int) error { return this.Channel.Publish(exchange, key, true, false, amqp.Publishing{ Headers: map[string]interface{}{"x-delay":delay}, // 单位毫秒 ContentType: "text/plain", Body: []byte(message), }) }

生产部分

go func() { err := Lib.UserQueueInit() if err !=nil { errchan<-err } // 初始化用户注册延迟队列 err = Lib.UserDelayInit() if err !=nil { errchan<-err } }()

消费者不需要调整

消息者接收消息时会延迟delay 毫秒后收到

14. 记录消费者调用失败次数、逼格SQL技巧

首先建表 user_notify

`user_notify` ( `user_id` int(11) NOT NULL, `notify_num` int(11) NOT NULL DEFAULT '1', `is_done` int(11) NOT NULL DEFAULT '0', `updatetime` datetime NOT NULL, PRIMARY KEY (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

一旦调用邮件服务失败,写入这个表

使用MySQL库

扩展库:https://github.com/jmoiron/sqlx

安装

go get -u github.com/jmoiron/sqlx

mysql驱动:https://github.com/go-sql-driver/mysql

安装

go get -u github.com/go-sql-driver/mysql

mysql记录常规做法(伪代码)

开启事物 取出该条记录。 if 没有 insert info ... else if notify_num >=5 // 失败5次,程序里订阅 user_money=user_money-:money where user_name=:from =:money` result, err := tx.NamedExec(sql1, tm) if err != nil { return err } affected, err := result.RowsAffected() if err != nil { return err } // 受影响行为0 代表木有扣款 if affected == 0 { err = tx.Rollback() // 回滚 if err != nil { return err } return fmt.Errorf("扣款失败1111") } sql2 := "(:from,:to,:money,NOW())" // 写日志表 result, err = tx.NamedExec(sql2, tm) if err != nil { err2 := tx.Rollback() // 回滚 if err2 != nil { return err } return fmt.Errorf("扣款失败2222:%s",err.Error()) } affected, err = result.RowsAffected() if err != nil { err = tx.Rollback() // 回滚 if err != nil { return err } return fmt.Errorf("扣款失败3333",err.Error()) } // 受影响行为0 代表木有扣款 if affected == 0 { err = tx.Rollback() // 回滚 if err != nil { return err } return fmt.Errorf("扣款失败,写日志表出错") } tx.Commit() return nil }

a公司实际扣款操作

package main import ( "fmt" "github.com/gin-gonic/gin" "github.com/linzhenlong/golang-jt/go-rabbitmq/Trans" "log" "net/http" "os" "os/signal" "syscall" ) func main() { engine := gin.Default() engine.Use(Trans.HandleErr()) engine.POST("/", func(ctx *gin.Context) { transModel := Trans.NewTransModel() err := ctx.ShouldBindJSON(&transModel) Trans.CheckErr(err,"ShouldBindJSON error:") // 执行转账 err = Trans.TransMoney(transModel) Trans.CheckErr(err,"TransMoney error:") ctx.JSON(200,gin.H{"result":transModel.String()}) }) errChan := make(chan error) server := http.Server{ Addr: ":6060", Handler: engine, } go func() { err := server.ListenAndServe() if err != nil { errChan<- err } }() go func() { err := Trans.InitDB("a") if err != nil { errChan<- err } }() go func() { c := make(chan os.Signal) signal.Notify(c,syscall.SIGINT,syscall.SIGTERM,syscall.SIGKILL) errChan <- fmt.Errorf("%s",<- c) }() errMsg := <- errChan if errMsg != nil { log.Fatal(errMsg) } }

18.A公司转账业务逻辑:记录日志后发送消息到mq

初始化mq 队列

func TransInit() error { mq := NewMQ() if mq == nil { return fmt.Errorf("UserDelayInit init error") } defer mq.Channel.Close() err := mq.Channel.ExchangeDeclare(TRANS_EXCHANGE, "direct", false, false, false, false, nil) if err != nil { return fmt.Errorf("mq.Channel.ExchangeDeclare TRANS_EXCHANGE error:%s",err.Error()) } err = mq.DecQueuueAndBind(TRANS_QUEUE, TRANS_EXCHANGE, TRANS_ROUTER_KEY) if err != nil { return fmt.Errorf("trans DecQueuueAndBind error:%s",err.Error()) } return nil }

拉起框架时,起协程拉起mq

// 初始化队列 go func() { err := Lib.TransInit() if err != nil { errChan<- err } }()

发送消息到mq

engine.POST("/", func(ctx *gin.Context) { transModel := Trans.NewTransModel() err := ctx.ShouldBindJSON(&transModel) Trans.CheckErr(err,"ShouldBindJSON error:") // 执行转账 err = Trans.TransMoney(transModel) Trans.CheckErr(err,"TransMoney error:") // 写mq mq := Lib.NewMQ() jsonBytes, _ := json.Marshal(transModel) err = mq.SendMessage(Lib.TRANS_ROUTER_KEY, Lib.TRANS_EXCHANGE, string(jsonBytes)) Trans.CheckErr(err,"发送消息队列失败了") ctx.JSON(200,gin.H{"result":transModel.String()}) })

19.A公司转账业务逻辑:定时”无脑”补偿机制(上)

不管发送成功与否

思路如下: 1、我们写个 “死循环”程序

2、定时取5秒或自定义秒内 :status==0 的数据,再发一次消息

3、设定定时任务。定时清理20秒内(或自定义)status==0的消息,把它改为status=2

定时任务 第三方库

看这里 https://github.com/robfig/cron

安装: go get github.com/robfig/cron/v3@v3.0.0

https://www.jtthink.com/course/play/2461

定时任务补偿代码

package main import ( "github.com/linzhenlong/golang-jt/go-rabbitmq/Trans" "github.com/robfig/cron/v3" "log" ) const failSql = ` STATUS=2 where TIMESTAMPDIFF(SECOND,updatetime,now())>30 >2` var MyCron *cron.Cron func CronInit()error { MyCron = cron.New(cron.WithSeconds()) _, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog) return err } // 定时取消订单 func FailTransLog() { _, err := Trans.GetDB().(failSql) if err != nil { log.Println(err) } log.Println("更新成功") } func main() { errChan := make(chan error) go func() { err := Trans.InitDB("a") if err != nil { errChan<- err } }() go func() { err := CronInit() log.Println(err) if err !=nil { errChan<- err } MyCron.Start() // 开启定时任务 }() err := <-errChan if err != nil { log.Fatal(err) } }

20.A公司转账逻辑: 补偿机制之交易失败后“还钱 ”

两个任务

取消交易

STATUS=2 where TIMESTAMPDIFF(SECOND,updatetime,now())>20 >2

_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog) if err != nil { return err }

还钱

`,money from `translog` where `status`=2 0 limit 10

// 还钱 _, err = MyCron.AddFunc("0/4 * * * * *", BackMoney) if err != nil { return err }

SQL

首先加个字段: isback ,money from translog  where status=2 0 limit 10

这里面为了防止 数据不一致,都要依赖数据库事务

做个统一的事务提交

func clearTx(tx *sqlx.Tx) { err := tx.Commit() if err != nil && err != sql.ErrTxDone { log.Println("tx err",err) } islock = false }

全部代码

package main import ( "context" "database/sql" "github.com/jmoiron/sqlx" "github.com/linzhenlong/golang-jt/go-rabbitmq/Trans" "github.com/robfig/cron/v3" "log" "time" ) // 取消订单 const failSql = ` STATUS=2 where TIMESTAMPDIFF(SECOND,updatetime,now())>30 >2` // 从日志表里取出status=2 并且isback=0 的进行还钱操作 const backSql = "`,money from `translog` where `status`=2 0 limit 10" // 锁 防止上一个任务没执行完,下一个任务又开始了,产生脏读(只适用于单线程,通过变量控制锁) var islock = false func clearTx(tx *sqlx.Tx) { err := tx.Commit() if err != nil && err != sql.ErrTxDone { log.Println("tx err",err) } islock = false } // 还钱 func BackMoney() { if islock { log.Println("已经锁住了") return } txx, err := Trans.GetDB().BeginTxx(context.Background(), nil) if err != nil { log.Println("事务失败",err) return } islock = true // 加锁 defer clearTx(txx) // 清理事物 time.time.Second * 8) rows, err := txx.Queryx(backSql) if err != nil { log.Println("Queryx err:",err) txx.Rollback() } defer rows.Close() transModels := []Trans.TransModel{} err = sqlx.StructScan(rows, &transModels) if err != nil { log.Println("StructScan err:",err) txx.Rollback() } // 还钱操作 for _, row := range transModels { _, err = txx.(" user_money=user_money+? where user_name=?", row.Money, row.From) if err !=nil { txx.Rollback() } _, err = txx.(" isback=1 where tid=?",row.Tid) if err !=nil { txx.Rollback() } } } var MyCron *cron.Cron func CronInit()error { MyCron = cron.New(cron.WithSeconds()) _, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog) if err != nil { return err } // 还钱 _, err = MyCron.AddFunc("0/4 * * * * *", BackMoney) if err != nil { return err } return err } // 定时取消订单 func FailTransLog() { _, err := Trans.GetDB().(failSql) if err != nil { log.Println(err) } log.Println("更新成功") } func main() { errChan := make(chan error) go func() { err := Trans.InitDB("a") if err != nil { errChan<- err } }() go func() { err := CronInit() log.Println(err) if err !=nil { errChan<- err } MyCron.Start() // 开启定时任务 }() err := <-errChan if err != nil { log.Fatal(err) } }

21.补偿机制之重发MQ消息、B公司记录日志

今天完成的任务是

取出交易时间在8秒内,且status=0的数据,进行MQ 重发

1、SQL如下 translog where TIMESTAMPDIFF(SECOND,updatetime,now())<=8 0

2、定时器设置:为 每隔2秒 处理。

B公司日志表

和A公司一样。 不需要IsBack tid 注意 不需要自增

b公司消费代码

package main import ( "encoding/json" "flag" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/Trans" "github.com/streadway/amqp" "log" "fmt" ) func saveLog(tm *Trans.TransModel, msg amqp.Delivery) { fmt.Println(tm.Tid,tm.From,tm.Money) sql := "(?,?,?,?,now())" _, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money) if err != nil { log.Println("1111",err) } msg.Ack(false) } func myconsumer(messages <-chan amqp.Delivery,c string) { for msg := range messages { transModel := Trans.NewTransModel() err2 := json.Unmarshal(msg.Body, transModel) if err2 != nil { log.Println(err2) } else { fmt.Println(transModel.Tid,transModel.From,transModel.Money) go saveLog(transModel, msg) } } } var mqClient *Lib.MQ func main() { cname := flag.String("c", "", "消费者名称") flag.Parse() if *cname == "" { log.Fatal("c 参数不能为空") } // 初始化db dberr := Trans.InitDB("b") if dberr != nil { log.Fatal(dberr) } mqClient = Lib.NewMQ() err := mqClient.Channel.Qos(2, 0, false) if err != nil { log.Fatal(err) } mqClient.Consume(Lib.TRANS_QUEUE,*cname, myconsumer) defer mqClient.Channel.Close() }

22.B公司业务逻辑:确认收钱

A和B 要约定个 回调地址(A是回调地址)   http://localhost:8080/callback-----A

参数:tid

SQL status=1 where tid=? 0

A 公司回调接口

// 回调接口 engine.POST("/callback", func(ctx *gin.Context) { tid := ctx.PostForm("tid") sql := " `status`=1 where tid=? 0" result, err := Trans.GetDB().(sql, tid) affected, err2 := result.RowsAffected() if err != nil || err2 != nil || affected != 1 { ctx.String(200,"error") } else { ctx.String(200,"success") } })

B公司使用mysql 事物保证日志及确认收钱及回调成功

B消费者 消费到记录后 执行两个过程 1) 插记录 2)把钱更新给用户 3) 回调接口 3步必须都成功。否则回滚数据库。

package main import ( "context" "database/sql" "encoding/json" "flag" "github.com/jmoiron/sqlx" "github.com/linzhenlong/golang-jt/go-rabbitmq/Lib" "github.com/linzhenlong/golang-jt/go-rabbitmq/Trans" "github.com/streadway/amqp" "io/ioutil" "log" "fmt" "net/http" "strings" ) func clearTx(tx *sqlx.Tx) { err := tx.Commit() if err != nil && err != sql.ErrTxDone { log.Println("clearTx error",err) } } func saveLog(tm *Trans.TransModel, msg amqp.Delivery) { fmt.Println(tm.Tid,tm.From,tm.Money) sql := "(?,?,?,?,now())" _, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money) if err != nil { log.Println("1111",err) } msg.Ack(false) } func saveLogWithTx(tm *Trans.TransModel, msg amqp.Delivery) { txx, err := Trans.GetDB().BeginTxx(context.Background(), nil) if err != nil { log.Println("BeginTxx error",err) return } defer clearTx(txx) sql := "(?,?,?,?,now())" _, err = txx.(sql, tm.Tid, tm.From, tm.To, tm.Money) if err != nil { log.Println("(:order_no,:order_user,:order_time)", req) if err != nil { log.Println(err) } } msg.Ack(false) } } func main() { mq := Lib.NewMQ() defer mq.Channel.Close() err := Trans.InitDB("a") if err != nil { log.Fatal(err) } mq.Channel.Qos(2,0,false) mq.Consume(Lib.ORDER_QUEUE,"消费者1", saveOrder) }

展开 收起

UGREEN 绿联 DX4600 Pro 4盘位NAS(奔腾N6005、8GB)

UGREEN 绿联 DX4600 Pro 4盘位NAS(奔腾N6005、8GB)

1849元起

ZSpace 极空间 私有云 Z4Pro 8G版 4盘位NAS存储(N97、8GB)

ZSpace 极空间 私有云 Z4Pro 8G版 4盘位NAS存储(N97、8GB)

2485.01元起

UGREEN 绿联 DX4600 四盘位NAS存储 (赛扬N5105、8GB)

UGREEN 绿联 DX4600 四盘位NAS存储 (赛扬N5105、8GB)

1749元起

ZSpace 极空间 私有云 Z4Pro 性能版 NAS存储(N305、16GB)

ZSpace 极空间 私有云 Z4Pro 性能版 NAS存储(N305、16GB)

3499元起

QNAP 威联通 TS-464C2 四盘位 NAS网络存储(赛扬N5095、8GB)黑色

QNAP 威联通 TS-464C2 四盘位 NAS网络存储(赛扬N5095、8GB)黑色

2599元起

UGREEN 绿联 DX4600+ 4盘位NAS(赛扬N5105、8GB)

UGREEN 绿联 DX4600+ 4盘位NAS(赛扬N5105、8GB)

1999元起

Synology 群晖 DS224+ 双盘位NAS(赛扬J4125、2GB)

Synology 群晖 DS224+ 双盘位NAS(赛扬J4125、2GB)

2849元起

ZSpace 极空间 私有云 Z423 旗舰版 8盘位NAS存储(锐龙R7-5825U、64GB)

ZSpace 极空间 私有云 Z423 旗舰版 8盘位NAS存储(锐龙R7-5825U、64GB)

5899元起

ZSpace 极空间 私有云 Z4Pro 16G版 4盘位NAS存储(N97、16GB)

ZSpace 极空间 私有云 Z4Pro 16G版 4盘位NAS存储(N97、16GB)

2799元起

QNAP 威联通 TS-466C 四盘位NAS(奔腾N6005、8GB)

QNAP 威联通 TS-466C 四盘位NAS(奔腾N6005、8GB)

3199元起

TERRAMASTER 铁威马 F4-424 Pro 四盘位NAS(Intel Core i3、32GB)黑色

TERRAMASTER 铁威马 F4-424 Pro 四盘位NAS(Intel Core i3、32GB)黑色

2499元起

Synology 群晖 DS220+ 2盘位NAS (赛扬J4025、2GB)

Synology 群晖 DS220+ 2盘位NAS (赛扬J4025、2GB)

2179元起

UGREEN 绿联 DH2600 双盘位NAS (N5105、4GB)

UGREEN 绿联 DH2600 双盘位NAS (N5105、4GB)

1299元起

ZSpace 极空间 私有云Z2Pro 四核2盘位NAS家庭个人云网络存储服务器手机平板扩容适用iPhone15 水

ZSpace 极空间 私有云Z2Pro 四核2盘位NAS家庭个人云网络存储服务器手机平板扩容适用iPhone15 水

1290.51元起

ZSpace 极空间 私有云 T2 2盘位NAS存储(RK 3568、4GB)

ZSpace 极空间 私有云 T2 2盘位NAS存储(RK 3568、4GB)

1699元起

ASUS 华硕 AS6702T 4盘位NAS存储 黑色(Core2 Quad Q8300、4GB)

ASUS 华硕 AS6702T 4盘位NAS存储 黑色(Core2 Quad Q8300、4GB)

4299元起
0评论

当前文章无评论,是时候发表评论了
提示信息

取消
确认
评论举报

相关好价推荐
查看更多好价

相关文章推荐

更多精彩文章
更多精彩文章

linzl

Ta还没有介绍自己

关注 打赏
最新文章 热门文章
2
扫一下,分享更方便,购买更轻松