go web+RabbitMQ实战速学
1. 为啥要用MQ
为啥使用,因为他很牛逼
2.使用docker部署单机RabbitMQ、go客户端库
docker 镜像
https://hub.docker.com/_/rabbitmq
docker pull rabbitmq:3.8.10-management-alpine
说明:management代表是带管理后台
启动容器
docker run -d --name my-rmq
-e RABBITMQ_DEFAULT_USER=linzl
-e RABBITMQ_DEFAULT_PASS=123
-p 8081:15672
-p 5672:5672
rabbitmq:3.8.10-management-alpine
rabbitmq golang 客户端库
https://github.com/streadway/amqp
go get -u github.com/streadway/amqp
测试golang 链接 mq
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)
connection, err := amqp.Dial(dsn)
if err != nil {
log.Fatal(err)
}
defer connection.Close()
fmt.Println(connection)
}
生产者创建channel发送消息给Exchange
Exchange(有多种交换机)根据策略binding队列进行消息投递
队列具有推/拉模式
消费者使用channel获取消息,并确认接收或拒绝,重新入列给别的消费者
3. 用最简单的方式:生产者发送第一条消息
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
connection, err := amqp.Dial("amqp://linzl:123@192.168.1.6:5672")
if err != nil {
log.Fatal(err)
}
defer connection.Close()
// 获取channel 连接
channelConn, err := connection.Channel()
if err != nil {
log.Fatal(err)
}
defer channelConn.Close()
// 创建队列
queue, err := channelConn.QueueDeclare(
"test_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
// 发生消息
err = channelConn.Publish(
"",
queue.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(`test002`),
},
)
if err != nil {
log.Fatal(err)
}
fmt.Println("简单生产一条消息成功")
}
4.用最简单的方式:消费者读取消息
对连接mq 简单封装
package AppInit
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
var (
err error
MQConn *amqp.Connection
)
func init() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)
MQConn, err = amqp.Dial(dsn)
if err != nil {
log.Fatal(err)
}
log.Println(MQConn.Major)
}
func GetMQ()*amqp.Connection {
return MQConn
}
消费者
package main
import (
"fmt"
"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"
"log"
)
func main() {
mq := AppInit.GetMQ()
defer mq.Close()
channelConn, err := mq.Channel()
if err != nil {
log.Fatal(err)
}
defer channelConn.Close()
// 消费
msgs, err := channelConn.Consume(
"test_queue",
"Consumer01",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
for msg := range msgs {
msg.Ack(false) // 确认
fmt.Println(msg.DeliveryTag,string(msg.Body))
}
}
5.简单API过程、注册流程、MQ操作简单封装
案例用户注册
简单的API过程、注册流程、MQ操作简单封装
gin 框架构建用户注册api
userModel
package User
type UserModel struct {
UserID int64 `json:"user_id"`
UserName string `json:"user_name"`
}
func NewUserModel() *UserModel {
return &UserModel{}
}
user api
package main
import (
"encoding/json"
"github.com/gin-gonic/gin"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"
"net/http"
"time"
)
func main() {
engine := gin.New()
engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {
user := User.NewUserModel()
err := ctx.ShouldBindJSON(user)
if err != nil {
ctx.JSON(400,err.Error())
return
}
user.UserID =time.Now().Unix() // 模拟用户注册入库
if user.UserID > 0 { // 假设入库成功
bytes, _ := json.Marshal(user)
Lib.NewMQ().SendMessage(Lib.QUEUE_NEWUSER,string(bytes))
}
ctx.JSON(200,user)
})
engine.Run(":6060")
}
mq 操作简单封装
package Lib
import (
"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"
"github.com/streadway/amqp"
"log"
)
const (
// 用户注册队列名称
QUEUE_NEWUSER = "newuser"
)
type MQ struct {
Channel *amqp.Channel
}
func NewMQ() *MQ {
channel, err := AppInit.GetMQ().Channel()
if err != nil {
log.Println(err)
}
return &MQ{Channel: channel}
}
// SendMessage 发生消息到mq
func (this *MQ) SendMessage(queueName string, message string) error {
_, err := this.Channel.QueueDeclare(queueName, false, false,
false, false, nil)
if err != nil {
return err
}
return this.Channel.Publish("", queueName, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
6.定义交换机:向2个队列同时发送消息(QueueBind)
Exchange
Direct Exchange 也叫做直接模式交换机。交换机和和一个队列绑定起来,并指定路由键, 交换机会寻找匹配的路由键的绑定,并将消息路由给对应的队列
package Lib
import (
"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"
"github.com/streadway/amqp"
"log"
)
const (
// 用户注册队列名称
QUEUE_NEWUSER = "newuser"
)
type MQ struct {
Channel *amqp.Channel
}
func NewMQ() *MQ {
channel, err := AppInit.GetMQ().Channel()
if err != nil {
log.Println(err)
}
return &MQ{Channel: channel}
}
// SendMessage 发生消息到mq
func (this *MQ) SendMessage(queueName string, message string) error {
queue1, err := this.Channel.QueueDeclare(queueName, false, false,
false, false, nil)
if err != nil {
return err
}
// 假设是其他业务方用的队列
queue2, err := this.Channel.QueueDeclare(queueName+"other", false, false,
false, false, nil)
if err != nil {
return err
}
// 声明一个交换机
err = this.Channel.ExchangeDeclare("UserExchange", "direct",
false, false, false, false, nil)
if err != nil {
return err
}
// 队列1与交换机绑定
err = this.Channel.QueueBind(queue1.Name, "UserReg",
"UserExchange", false, nil)
if err != nil {
return err
}
// 队列2与交换机绑定
err = this.Channel.QueueBind(queue2.Name, "UserReg",
"UserExchange", false, nil)
if err != nil {
return err
}
return this.Channel.Publish("UserExchange", "UserReg", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
7.整理和调整代码结构、初始化队列等
初始化队列
go-rabbitmq/Lib/QueueInit.go
package Lib
import "fmt"
// UserQueueInit 用户队列初始化..
func UserQueueInit() error{
mq := NewMQ()
if mq == nil {
return fmt.Errorf("mq init err")
}
defer mq.Channel.Close()
// 声明交换机
err := mq.Channel.ExchangeDeclare(USER_EXCHANGE, "direct", false,
false, false, false, nil)
if err != nil {
return fmt.Errorf("Exchange error:%s",err.Error())
}
// 声明队列及绑定
queues := fmt.Sprintf("%s,%s",QUEUE_NEWUSER,QUEUE_NEWUSER_OTHER01)
err = mq.DecQueuueAndBind(queues, USER_EXCHANGE, USER_REG_ROUTER_KEY)
if err != nil {
return fmt.Errorf("DecQueuueAndBind error:%s",err.Error())
}
return nil
}
SendMessage改造
package Lib
import (
"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"
"github.com/streadway/amqp"
"log"
"strings"
)
const (
// 用户注册队列名称
QUEUE_NEWUSER = "newuser"
// 其他业务的新用户队列
QUEUE_NEWUSER_OTHER01 = "newuser-other01"
// 用户业务交换机
USER_EXCHANGE = "exchange-user"
// 用户注册路由key
USER_REG_ROUTER_KEY = "router-key-userreg"
)
type MQ struct {
Channel *amqp.Channel
}
func NewMQ() *MQ {
channel, err := AppInit.GetMQ().Channel()
if err != nil {
log.Println(err)
}
return &MQ{Channel: channel}
}
// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开
func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {
queueList := strings.Split(queues,",")
for _,queue := range queueList {
// 声明队列
q, err := this.Channel.QueueDeclare(queue, false, false,
false, false, nil)
if err != nil {
return err
}
// 绑定交换机和路由key
err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)
if err != nil {
return err
}
}
return nil
}
// SendMessage 发生消息到mq
func (this *MQ) SendMessage(key string, exchange string,message string) error {
return this.Channel.Publish(exchange, key, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
拉起gin框架时初始化队列
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
errchan := make(chan error)
engine := gin.Default()
engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {
user := User.NewUserModel()
err := ctx.ShouldBindJSON(user)
if err != nil {
ctx.JSON(400,err.Error())
return
}
user.UserID =time.Now().Unix() // 模拟用户注册入库
if user.UserID > 0 { // 假设入库成功
bytes, _ := json.Marshal(user)
mq := Lib.NewMQ()
err := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))
if err != nil {
log.Println(err)
errchan<-err
}
defer mq.Channel.Close()
}
ctx.JSON(200,user)
})
server := http.Server{
Addr: ":6060",
Handler: engine,
}
go func() {
err := server.ListenAndServe()
if err != nil {
errchan<-err
}
}()
go func() {
err := Lib.UserQueueInit()
if err !=nil {
errchan<-err
}
}()
go func() {
c := make(chan os.Signal)
signal.Notify(c,syscall.SIGINT,syscall.SIGUSR2,syscall.SIGTERM,syscall.SIGKILL)
errchan<-fmt.Errorf("%s",<-c)
}()
errMsg := <- errchan
if errMsg != nil {
server.Shutdown(context.Background())
log.Println("退出了")
}
}
8.客户端消费注册用户消息、确认消息
模拟消费
接收消息
模拟发生邮件
ack 确认
消费
package main
import (
"encoding/json"
"fmt"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"
"github.com/streadway/amqp"
"time"
)
func SendMail(msgs <-chan amqp.Delivery) {
for msg := range msgs {
userModel := &User.UserModel{}
json.Unmarshal(msg.Body, userModel)
fmt.Printf("向userid=%d的用户发生邮件n",userModel.UserID)
time.time.Second)
// 应答一下
msg.Ack(false)
}
}
func main() {
mq := Lib.NewMQ()
mq.Consume(Lib.QUEUE_NEWUSER,"c1",SendMail)
defer mq.Channel.Close()
}
MQ.go 新增Consume方法
package Lib
import (
"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"
"github.com/streadway/amqp"
"log"
"strings"
)
const (
// 用户注册队列名称
QUEUE_NEWUSER = "newuser"
// 其他业务的新用户队列
QUEUE_NEWUSER_OTHER01 = "newuser-other01"
// 用户业务交换机
USER_EXCHANGE = "exchange-user"
// 用户注册路由key
USER_REG_ROUTER_KEY = "router-key-userreg"
)
type MQ struct {
Channel *amqp.Channel
}
func NewMQ() *MQ {
channel, err := AppInit.GetMQ().Channel()
if err != nil {
log.Println(err)
}
return &MQ{Channel: channel}
}
// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开
func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {
queueList := strings.Split(queues,",")
for _,queue := range queueList {
// 声明队列
q, err := this.Channel.QueueDeclare(queue, false, false,
false, false, nil)
if err != nil {
return err
}
// 绑定交换机和路由key
err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)
if err != nil {
return err
}
}
return nil
}
// SendMessage 发生消息到mq
func (this *MQ) SendMessage(key string, exchange string,message string) error {
return this.Channel.Publish(exchange, key, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
func (this *MQ)Consume(queueName string, key string,callback func(<-chan amqp.Delivery )) {
msgs, err := this.Channel.Consume(queueName, key, false,
false, false, false, nil)
if err != nil {
log.Fatal(err)
}
callback(msgs)
}
9. 多消费者消费消息、重新入列
消费者改造支持多消费者
package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"
"github.com/streadway/amqp"
"log"
"time"
)
func SendMail(msgs <-chan amqp.Delivery,c string) {
for msg := range msgs {
userModel := &User.UserModel{}
json.Unmarshal(msg.Body, userModel)
fmt.Printf("消费者:%s,向userid=%d的用户发生邮件n",c,userModel.UserID)
time.time.Second)
if c == "c1" { // 模拟c1 出问题,没有ack
//msg.Reject(false) // 拒绝消息,false不重新进入队列,此时消息会被丢弃
msg.Reject(true) // 拒绝消息,true重新进入队列,此时消息不会被丢弃
continue
}
// 应答一下
msg.Ack(false)
}
}
func main() {
cname := flag.String("c", "", "消费者名称")
flag.Parse()
if *cname == "" {
log.Fatal("c 必须指定")
}
mq := Lib.NewMQ()
mq.Consume(Lib.QUEUE_NEWUSER,*cname,SendMail)
defer mq.Channel.Close()
}
10.消费者限流:ACK后再收新消息
代码改造一下,使用协程消费
package main
import (
"encoding/json"
"flag"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"
"github.com/streadway/amqp"
"log"
"time"
)
func Send(c string, msg amqp.Delivery) error {
time.time.Second * 3)
userModel := &User.UserModel{}
json.Unmarshal(msg.Body, userModel)
log.Printf("消费者:%s,向userid=%d的用户发生邮件n",c,userModel.UserID)
msg.Ack(false)
return nil
}
func SendMail(msgs <-chan amqp.Delivery,c string) {
for msg := range msgs {
log.Println("收到消息",string(msg.Body))
// 起协程发送
go Send(c, msg)
}
}
func main() {
cname := flag.String("c", "", "消费者名称")
flag.Parse()
if *cname == "" {
log.Fatal("c 必须指定")
}
mq := Lib.NewMQ()
// 消费者限流
err := mq.Channel.Qos(2, 0, false)
if err != nil {
log.Fatal(err)
}
mq.Consume(Lib.QUEUE_NEWUSER,*cname,SendMail)
defer mq.Channel.Close()
}
消费者限流
mq.Channel.Qos(2, 0, false) 第一个参数prefetchCount 可以限制,当接受prefetchCount 条消息后
只有ack 之后可以继续接收消费下一条消息,起到保护消费者的作用
mq.Channel.Qos(2, 0, false)
11. 开启模式、记录失败的消息
当生产消息时,由于mq 的网络问题或是其他问题,可能出现发送失败的情况
当有些敏感信息又不能失败,需要确保每一条消息都发送成功
因此mq 有一个机制就是可以开发 模式,当给mq发送消息时,如果成功会有一个ack 回执
ack 成功说明发送消息成功,ack 失败时需要记录日志(写到mysql 或redis)什么的进行重发
第一步
开启模式
// SetConfirm 设置模式
func (this *MQ)Setconfirm() {
err := this.Channel.confirm(false)
if err != nil {
log.Println(err)
}
}
在MQ 的结构体添加属性
type MQ struct {
Channel *amqp.Channel
notifyConfirm chan amqp.Confirmation
}
// SetConfirm 设置模式
func (this *MQ)Setconfirm() {
err := this.Channel.confirm(false)
if err != nil {
log.Println(err)
}
this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))
}
发生消息后,当服务器确认此chan 会有数据传输过来
发生消息时调用SetConfirm 开启 模式 完整代码
MQ.go
package Lib
import (
"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"
"github.com/streadway/amqp"
"log"
"strings"
)
const (
// 用户注册队列名称
QUEUE_NEWUSER = "newuser"
// 其他业务的新用户队列
QUEUE_NEWUSER_OTHER01 = "newuser-other01"
// 用户业务交换机
USER_EXCHANGE = "exchange-user"
// 用户注册路由key
USER_REG_ROUTER_KEY = "router-key-userreg"
)
type MQ struct {
Channel *amqp.Channel
notifyConfirm chan amqp.Confirmation
}
func NewMQ() *MQ {
channel, err := AppInit.GetMQ().Channel()
if err != nil {
log.Println(err)
}
return &MQ{Channel: channel}
}
// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开
func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {
queueList := strings.Split(queues,",")
for _,queue := range queueList {
// 声明队列
q, err := this.Channel.QueueDeclare(queue, false, false,
false, false, nil)
if err != nil {
return err
}
// 绑定交换机和路由key
err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)
if err != nil {
return err
}
}
return nil
}
// SetConfirm 设置模式
func (this *MQ)Setconfirm() {
err := this.Channel.confirm(false)
if err != nil {
log.Println(err)
}
this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))
go this.Listenconfirm()
}
// ListenConfirm 监听消息
func (this *MQ)Listenconfirm() {
defer this.Channel.Close()
ret := <-this.notifyConfirm
if ret.Ack {
log.Println("消息发送成功")
} else {
log.Println("消息发送失败")
//TODO 记录日志,其他程序重发消息
}
}
// SendMessage 发生消息到mq
func (this *MQ) SendMessage(key string, exchange string,message string) error {
return this.Channel.Publish(exchange, key, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
func (this *MQ)Consume(queueName string, key string,callback func(messages <-chan amqp.Delivery,cname string )) {
msgs, err := this.Channel.Consume(queueName, key, false,
false, false, false, nil)
if err != nil {
log.Fatal(err)
}
callback(msgs,key)
}
生产者
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
errchan := make(chan error)
engine := gin.Default()
engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {
user := User.NewUserModel()
err := ctx.ShouldBindJSON(user)
if err != nil {
ctx.JSON(400,err.Error())
return
}
user.UserID =time.Now().Unix() // 模拟用户注册入库
if user.UserID > 0 { // 假设入库成功
bytes, _ := json.Marshal(user)
mq := Lib.NewMQ()
// 开启 模式
mq.Setconfirm()
err := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))
if err != nil {
log.Println(err)
errchan<-err
}
//defer mq.Channel.Close()
}
ctx.JSON(200,user)
})
server := http.Server{
Addr: ":6060",
Handler: engine,
}
go func() {
err := server.ListenAndServe()
if err != nil {
errchan<-err
}
}()
go func() {
err := Lib.UserQueueInit()
if err !=nil {
errchan<-err
}
}()
go func() {
c := make(chan os.Signal)
signal.Notify(c,syscall.SIGINT,syscall.SIGUSR2,syscall.SIGTERM,syscall.SIGKILL)
errchan<-fmt.Errorf("%s",<-c)
}()
errMsg := <- errchan
if errMsg != nil {
server.Shutdown(context.Background())
log.Println("退出了")
}
}
12.监听消息入列回执:NotifyReturn的用法
mandatory参数
如果为true,在exchange正常且可以到达的情况下。
如果exchange+routeKey 无法投递给queue,那么MQ会将消息还给生产者
如果为false,则直接丢弃
模拟无法投递到exchange+routeKey 通过rabbitmq 管理后台,手动解绑(写多个队列的需要全部解绑)
package Lib
import (
"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"
"github.com/streadway/amqp"
"log"
"strings"
)
const (
// 用户注册队列名称
QUEUE_NEWUSER = "newuser"
// 其他业务的新用户队列
QUEUE_NEWUSER_OTHER01 = "newuser-other01"
// 用户业务交换机
USER_EXCHANGE = "exchange-user"
// 用户注册路由key
USER_REG_ROUTER_KEY = "router-key-userreg"
)
type MQ struct {
Channel *amqp.Channel
notifyConfirm chan amqp.Confirmation
// NotifyReturn的用法
notifyReturn chan amqp.Return
}
func NewMQ() *MQ {
channel, err := AppInit.GetMQ().Channel()
if err != nil {
log.Println(err)
}
return &MQ{Channel: channel}
}
// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开
func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {
queueList := strings.Split(queues,",")
for _,queue := range queueList {
// 声明队列
q, err := this.Channel.QueueDeclare(queue, false, false,
false, false, nil)
if err != nil {
return err
}
// 绑定交换机和路由key
err = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)
if err != nil {
return err
}
}
return nil
}
// SetConfirm 设置模式
func (this *MQ)Setconfirm() {
err := this.Channel.confirm(false)
if err != nil {
log.Println(err)
}
this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))
go this.Listenconfirm()
}
// ListenConfirm 监听消息
func (this *MQ)Listenconfirm() {
defer this.Channel.Close()
ret := <-this.notifyConfirm
if ret.Ack {
log.Println(":消息发送成功")
} else {
log.Println(":消息发送失败")
//TODO 记录日志,其他程序重发消息
}
}
func (this *MQ)NotifyReturn() {
this.notifyReturn = this.Channel.NotifyReturn(make(chan amqp.Return))
go this.ListenNotifyReturn()
}
// 需要使用协程
func (this *MQ)ListenNotifyReturn() {
ret := <- this.notifyReturn
if string(ret.Body) != "" {
log.Println("消息没有成功写入队列", string(ret.Body))
}
log.Println("ListenNotifyReturn",ret.Headers)
}
// SendMessage 发生消息到mq
func (this *MQ) SendMessage(key string, exchange string,message string) error {
return this.Channel.Publish(exchange, key, true, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
func (this *MQ)Consume(queueName string, key string,callback func(messages <-chan amqp.Delivery,cname string )) {
msgs, err := this.Channel.Consume(queueName, key, false,
false, false, false, nil)
if err != nil {
log.Fatal(err)
}
callback(msgs,key)
}
SendMessage 之前需要 NotifyReturn
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
errchan := make(chan error)
engine := gin.Default()
engine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {
user := User.NewUserModel()
err := ctx.ShouldBindJSON(user)
if err != nil {
ctx.JSON(400,err.Error())
return
}
user.UserID =time.Now().Unix() // 模拟用户注册入库
if user.UserID > 0 { // 假设入库成功
bytes, _ := json.Marshal(user)
mq := Lib.NewMQ()
// 开启 模式
mq.Setconfirm()
// 监听return 需要在发送消息之前
mq.NotifyReturn()
err := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))
if err != nil {
log.Println(err)
errchan<-err
}
//defer mq.Channel.Close()
}
ctx.JSON(200,user)
})
server := http.Server{
Addr: ":6060",
Handler: engine,
}
go func() {
err := server.ListenAndServe()
if err != nil {
errchan<-err
}
}()
go func() {
err := Lib.UserQueueInit()
if err !=nil {
errchan<-err
}
}()
go func() {
c := make(chan os.Signal)
signal.Notify(c,syscall.SIGINT,syscall.SIGUSR2,syscall.SIGTERM,syscall.SIGKILL)
errchan<-fmt.Errorf("%s",<-c)
}()
errMsg := <- errchan
if errMsg != nil {
server.Shutdown(context.Background())
log.Println("退出了")
}
}
13. 以用户注册为例产生的事务需求、延迟队列使用
基本实现
生产者注册成功之后发生消息
消息者接受消息后,调用邮件服务
调用失败。重新入列(要加个延迟时间,失败次数越多,延迟时间越长)
超过最大重试次数。就不发邮件了
安装插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
这是一个延迟交换机插件。省去我们自己写规则的麻烦
由于我们使用的是3.8.10。因此使用3.8.10对应的插件
拷贝plugins中,容器对应的目录是/opt/rabbitmq/plugins
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez my-rmq:/opt/rabbitmq/plugins
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
延迟队列使用
官方文档:
// ... elided code ...
Map args = new HashMap();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map headers = new HashMap();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");
Map headers2 = new HashMap();
headers2.put("x-delay", 1000);
AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);
// ... more code ...
因此go 代码改动
定义交互机的kind 应该使用x-delayed-message,args 用map[string]interface{}{"x-delayed-type":"direct"}
// UserDelayInit 创建用户延迟交换机
func UserDelayInit() error {
mq := NewMQ()
if mq == nil {
return fmt.Errorf("UserDelayInit init error")
}
defer mq.Channel.Close()
// 声明交换机
err := mq.Channel.ExchangeDeclare(USER_EXCHANGE_DELAY, "x-delayed-message", false, false,
false, false, map[string]interface{}{"x-delayed-type":"direct"})
if err != nil {
return fmt.Errorf("UserDelayInit ExchangeDeclare error")
}
// 声明队列名称及绑定
queues := fmt.Sprintf("%s",QUEUE_NEWUSER)
err = mq.DecQueuueAndBind(queues, USER_EXCHANGE_DELAY, USER_REG_ROUTER_KEY)
if err != nil {
return fmt.Errorf("DecQueuueAndBind error:%s",err.Error())
}
return nil
}
发送消息时需要设置Headers: map[string]interface{}{"x-delay":delay}, // 单位毫秒
// SendDelayMessage 发生延迟消息到mq
// delay 单位是ms
func (this *MQ) SendDelayMessage(key string, exchange string,message string,delay int) error {
return this.Channel.Publish(exchange, key, true, false,
amqp.Publishing{
Headers: map[string]interface{}{"x-delay":delay}, // 单位毫秒
ContentType: "text/plain",
Body: []byte(message),
})
}
生产部分
go func() {
err := Lib.UserQueueInit()
if err !=nil {
errchan<-err
}
// 初始化用户注册延迟队列
err = Lib.UserDelayInit()
if err !=nil {
errchan<-err
}
}()
消费者不需要调整
消息者接收消息时会延迟delay 毫秒后收到
14. 记录消费者调用失败次数、逼格SQL技巧
首先建表 user_notify
`user_notify` (
`user_id` int(11) NOT NULL,
`notify_num` int(11) NOT NULL DEFAULT '1',
`is_done` int(11) NOT NULL DEFAULT '0',
`updatetime` datetime NOT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
一旦调用邮件服务失败,写入这个表
使用MySQL库
扩展库:https://github.com/jmoiron/sqlx
安装
go get -u github.com/jmoiron/sqlx
mysql驱动:https://github.com/go-sql-driver/mysql
安装
go get -u github.com/go-sql-driver/mysql
mysql记录常规做法(伪代码)
开启事物
取出该条记录。
if 没有
insert info ...
else
if notify_num >=5 // 失败5次,程序里订阅
user_money=user_money-:money
where user_name=:from =:money`
result, err := tx.NamedExec(sql1, tm)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
// 受影响行为0 代表木有扣款
if affected == 0 {
err = tx.Rollback() // 回滚
if err != nil {
return err
}
return fmt.Errorf("扣款失败1111")
}
sql2 := "(:from,:to,:money,NOW())"
// 写日志表
result, err = tx.NamedExec(sql2, tm)
if err != nil {
err2 := tx.Rollback() // 回滚
if err2 != nil {
return err
}
return fmt.Errorf("扣款失败2222:%s",err.Error())
}
affected, err = result.RowsAffected()
if err != nil {
err = tx.Rollback() // 回滚
if err != nil {
return err
}
return fmt.Errorf("扣款失败3333",err.Error())
}
// 受影响行为0 代表木有扣款
if affected == 0 {
err = tx.Rollback() // 回滚
if err != nil {
return err
}
return fmt.Errorf("扣款失败,写日志表出错")
}
tx.Commit()
return nil
}
a公司实际扣款操作
package main
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"
"log"
"net/http"
"os"
"os/signal"
"syscall"
)
func main() {
engine := gin.Default()
engine.Use(Trans.HandleErr())
engine.POST("/", func(ctx *gin.Context) {
transModel := Trans.NewTransModel()
err := ctx.ShouldBindJSON(&transModel)
Trans.CheckErr(err,"ShouldBindJSON error:")
// 执行转账
err = Trans.TransMoney(transModel)
Trans.CheckErr(err,"TransMoney error:")
ctx.JSON(200,gin.H{"result":transModel.String()})
})
errChan := make(chan error)
server := http.Server{
Addr: ":6060",
Handler: engine,
}
go func() {
err := server.ListenAndServe()
if err != nil {
errChan<- err
}
}()
go func() {
err := Trans.InitDB("a")
if err != nil {
errChan<- err
}
}()
go func() {
c := make(chan os.Signal)
signal.Notify(c,syscall.SIGINT,syscall.SIGTERM,syscall.SIGKILL)
errChan <- fmt.Errorf("%s",<- c)
}()
errMsg := <- errChan
if errMsg != nil {
log.Fatal(errMsg)
}
}
18.A公司转账业务逻辑:记录日志后发送消息到mq
初始化mq 队列
func TransInit() error {
mq := NewMQ()
if mq == nil {
return fmt.Errorf("UserDelayInit init error")
}
defer mq.Channel.Close()
err := mq.Channel.ExchangeDeclare(TRANS_EXCHANGE, "direct", false,
false, false, false, nil)
if err != nil {
return fmt.Errorf("mq.Channel.ExchangeDeclare TRANS_EXCHANGE error:%s",err.Error())
}
err = mq.DecQueuueAndBind(TRANS_QUEUE, TRANS_EXCHANGE, TRANS_ROUTER_KEY)
if err != nil {
return fmt.Errorf("trans DecQueuueAndBind error:%s",err.Error())
}
return nil
}
拉起框架时,起协程拉起mq
// 初始化队列
go func() {
err := Lib.TransInit()
if err != nil {
errChan<- err
}
}()
发送消息到mq
engine.POST("/", func(ctx *gin.Context) {
transModel := Trans.NewTransModel()
err := ctx.ShouldBindJSON(&transModel)
Trans.CheckErr(err,"ShouldBindJSON error:")
// 执行转账
err = Trans.TransMoney(transModel)
Trans.CheckErr(err,"TransMoney error:")
// 写mq
mq := Lib.NewMQ()
jsonBytes, _ := json.Marshal(transModel)
err = mq.SendMessage(Lib.TRANS_ROUTER_KEY, Lib.TRANS_EXCHANGE, string(jsonBytes))
Trans.CheckErr(err,"发送消息队列失败了")
ctx.JSON(200,gin.H{"result":transModel.String()})
})
19.A公司转账业务逻辑:定时”无脑”补偿机制(上)
不管发送成功与否
思路如下: 1、我们写个 “死循环”程序
2、定时取5秒或自定义秒内 :status==0 的数据,再发一次消息
3、设定定时任务。定时清理20秒内(或自定义)status==0的消息,把它改为status=2
定时任务 第三方库
看这里 https://github.com/robfig/cron
安装: go get github.com/robfig/cron/v3@v3.0.0
https://www.jtthink.com/course/play/2461
定时任务补偿代码
package main
import (
"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"
"github.com/robfig/cron/v3"
"log"
)
const failSql = ` STATUS=2 where
TIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`
var MyCron *cron.Cron
func CronInit()error {
MyCron = cron.New(cron.WithSeconds())
_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)
return err
}
// 定时取消订单
func FailTransLog() {
_, err := Trans.GetDB().(failSql)
if err != nil {
log.Println(err)
}
log.Println("更新成功")
}
func main() {
errChan := make(chan error)
go func() {
err := Trans.InitDB("a")
if err != nil {
errChan<- err
}
}()
go func() {
err := CronInit()
log.Println(err)
if err !=nil {
errChan<- err
}
MyCron.Start() // 开启定时任务
}()
err := <-errChan
if err != nil {
log.Fatal(err)
}
}
20.A公司转账逻辑: 补偿机制之交易失败后“还钱 ”
两个任务
取消交易
STATUS=2 where TIMESTAMPDIFF(SECOND,updatetime,now())>20 >2
_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)
if err != nil {
return err
}
还钱
`,money from `translog` where `status`=2 0 limit 10
// 还钱
_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)
if err != nil {
return err
}
SQL
首先加个字段: isback ,money from translog where status=2 0 limit 10
这里面为了防止 数据不一致,都要依赖数据库事务
做个统一的事务提交
func clearTx(tx *sqlx.Tx) {
err := tx.Commit()
if err != nil && err != sql.ErrTxDone {
log.Println("tx err",err)
}
islock = false
}
全部代码
package main
import (
"context"
"database/sql"
"github.com/jmoiron/sqlx"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"
"github.com/robfig/cron/v3"
"log"
"time"
)
// 取消订单
const failSql = ` STATUS=2 where
TIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`
// 从日志表里取出status=2 并且isback=0 的进行还钱操作
const backSql = "`,money from `translog` where `status`=2 0 limit 10"
// 锁 防止上一个任务没执行完,下一个任务又开始了,产生脏读(只适用于单线程,通过变量控制锁)
var islock = false
func clearTx(tx *sqlx.Tx) {
err := tx.Commit()
if err != nil && err != sql.ErrTxDone {
log.Println("tx err",err)
}
islock = false
}
// 还钱
func BackMoney() {
if islock {
log.Println("已经锁住了")
return
}
txx, err := Trans.GetDB().BeginTxx(context.Background(), nil)
if err != nil {
log.Println("事务失败",err)
return
}
islock = true // 加锁
defer clearTx(txx) // 清理事物
time.time.Second * 8)
rows, err := txx.Queryx(backSql)
if err != nil {
log.Println("Queryx err:",err)
txx.Rollback()
}
defer rows.Close()
transModels := []Trans.TransModel{}
err = sqlx.StructScan(rows, &transModels)
if err != nil {
log.Println("StructScan err:",err)
txx.Rollback()
}
// 还钱操作
for _, row := range transModels {
_, err = txx.(" user_money=user_money+? where user_name=?",
row.Money, row.From)
if err !=nil {
txx.Rollback()
}
_, err = txx.(" isback=1 where tid=?",row.Tid)
if err !=nil {
txx.Rollback()
}
}
}
var MyCron *cron.Cron
func CronInit()error {
MyCron = cron.New(cron.WithSeconds())
_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)
if err != nil {
return err
}
// 还钱
_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)
if err != nil {
return err
}
return err
}
// 定时取消订单
func FailTransLog() {
_, err := Trans.GetDB().(failSql)
if err != nil {
log.Println(err)
}
log.Println("更新成功")
}
func main() {
errChan := make(chan error)
go func() {
err := Trans.InitDB("a")
if err != nil {
errChan<- err
}
}()
go func() {
err := CronInit()
log.Println(err)
if err !=nil {
errChan<- err
}
MyCron.Start() // 开启定时任务
}()
err := <-errChan
if err != nil {
log.Fatal(err)
}
}
21.补偿机制之重发MQ消息、B公司记录日志
今天完成的任务是
取出交易时间在8秒内,且status=0的数据,进行MQ 重发
1、SQL如下 translog where TIMESTAMPDIFF(SECOND,updatetime,now())<=8 0
2、定时器设置:为 每隔2秒 处理。
B公司日志表
和A公司一样。 不需要IsBack
tid 注意 不需要自增
b公司消费代码
package main
import (
"encoding/json"
"flag"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"
"github.com/streadway/amqp"
"log"
"fmt"
)
func saveLog(tm *Trans.TransModel, msg amqp.Delivery) {
fmt.Println(tm.Tid,tm.From,tm.Money)
sql := "(?,?,?,?,now())"
_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)
if err != nil {
log.Println("1111",err)
}
msg.Ack(false)
}
func myconsumer(messages <-chan amqp.Delivery,c string) {
for msg := range messages {
transModel := Trans.NewTransModel()
err2 := json.Unmarshal(msg.Body, transModel)
if err2 != nil {
log.Println(err2)
} else {
fmt.Println(transModel.Tid,transModel.From,transModel.Money)
go saveLog(transModel, msg)
}
}
}
var mqClient *Lib.MQ
func main() {
cname := flag.String("c", "", "消费者名称")
flag.Parse()
if *cname == "" {
log.Fatal("c 参数不能为空")
}
// 初始化db
dberr := Trans.InitDB("b")
if dberr != nil {
log.Fatal(dberr)
}
mqClient = Lib.NewMQ()
err := mqClient.Channel.Qos(2, 0, false)
if err != nil {
log.Fatal(err)
}
mqClient.Consume(Lib.TRANS_QUEUE,*cname, myconsumer)
defer mqClient.Channel.Close()
}
22.B公司业务逻辑:确认收钱
A和B 要约定个 回调地址(A是回调地址) http://localhost:8080/callback-----A
参数:tid
SQL status=1 where tid=? 0
A 公司回调接口
// 回调接口
engine.POST("/callback", func(ctx *gin.Context) {
tid := ctx.PostForm("tid")
sql := " `status`=1 where tid=? 0"
result, err := Trans.GetDB().(sql, tid)
affected, err2 := result.RowsAffected()
if err != nil || err2 != nil || affected != 1 {
ctx.String(200,"error")
} else {
ctx.String(200,"success")
}
})
B公司使用mysql 事物保证日志及确认收钱及回调成功
B消费者 消费到记录后 执行两个过程 1) 插记录 2)把钱更新给用户 3) 回调接口 3步必须都成功。否则回滚数据库。
package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"github.com/jmoiron/sqlx"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"
"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"
"github.com/streadway/amqp"
"io/ioutil"
"log"
"fmt"
"net/http"
"strings"
)
func clearTx(tx *sqlx.Tx) {
err := tx.Commit()
if err != nil && err != sql.ErrTxDone {
log.Println("clearTx error",err)
}
}
func saveLog(tm *Trans.TransModel, msg amqp.Delivery) {
fmt.Println(tm.Tid,tm.From,tm.Money)
sql := "(?,?,?,?,now())"
_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)
if err != nil {
log.Println("1111",err)
}
msg.Ack(false)
}
func saveLogWithTx(tm *Trans.TransModel, msg amqp.Delivery) {
txx, err := Trans.GetDB().BeginTxx(context.Background(), nil)
if err != nil {
log.Println("BeginTxx error",err)
return
}
defer clearTx(txx)
sql := "(?,?,?,?,now())"
_, err = txx.(sql, tm.Tid, tm.From, tm.To, tm.Money)
if err != nil {
log.Println("(:order_no,:order_user,:order_time)", req)
if err != nil {
log.Println(err)
}
}
msg.Ack(false)
}
}
func main() {
mq := Lib.NewMQ()
defer mq.Channel.Close()
err := Trans.InitDB("a")
if err != nil {
log.Fatal(err)
}
mq.Channel.Qos(2,0,false)
mq.Consume(Lib.ORDER_QUEUE,"消费者1", saveOrder)
}