golang之消息队列rabbitmq
文章目录
- 消息队列的作用:
- 收发流程
- docker安装
- 官方文档
- 消息收发模式
- 1.简单模式
- 2.工作队列模式
- 3.发布订阅模式(扇出模式)
- 4.direct(路由)模式:
- 5.topic模式
- 用go操作rabbitmq
- 写代码的思路
- 收发模式2示例:
- fanout模式示例:
- routing(路由)模式示例
- topic模式
- 高级操作
- 消费者确认模式:
- 消费限流
- 延迟消息
- 持久化
- 交换机持久化:
- 队列持久化
- 消息持久化
消息队列的作用:
- 异步,将同步的消息变为异步,例如我们可以使用rpc调用另一个服务,但是我们必须等待返回(同步),用mq可以变异步
- 解耦,将单体服务拆分多个微服务,实现了分布式部署,单个服务的修改、增加或删除,不影响其他服务,不需要全部服务关闭重启
- 抗压,由于是异步,解耦的,高并发请求到来时,我们不直接发送给服务,而是发给MQ,让服务决定什么时候接收消息,提供服务,这样就缓解了服务的压力
图示:
用户注册后发邮件和虚拟币:
异步解耦图:
抗压图:
收发流程
- 生产者发送消息的流程
- 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
- 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
- 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过 routingKey (路由Key)将交换器和队列绑定( binding )起来
- 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
- 相应的交换器根据接收到的 routingKey 查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接
- 消费者接收消息的过程
- 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及
做一些准备工作 - 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
- 消费者确认( ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
docker安装
拉取image:
docker pull rabbitmq:3.8-management-alpine
启动容器:
5672进行通信,15672 ,web管理工具
docker run -d --name rmq \
-e RABBITMQ_DEFAULT_USER=用户名 \
-e RABBITMQ_DEFAULT_PASS=密码 \
-p 15672:15672 \
-p 5672:5672 \
rabbitmq:3.8-management-alpine
官方文档
官方文档
消息收发模式
明确连个概念,exchange(路由) queue(队列)
工作模式:
以下用p
代指生产者,用 c
代指消费者,用 x
代指 exchange
1.简单模式
p发给队列,单个c消费,这里用的默认exchange,收发模式是direct
2.工作队列模式
p发给队列,多个c消费,这里用的默认exchange,收发模式是direct
3.发布订阅模式(扇出模式)
fandout模式:p将消息发给x,x将同一个消息发给所有q,c 按 1,2方式消费q的消息
4.direct(路由)模式:
p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息
5.topic模式
p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息,与4的区别是topic可以有通配符匹配
用go操作rabbitmq
写代码的思路
在初始化中完成
- 声明exchange
- 声明queue
- 将queue与key、exchange绑定
然后用conn.Channel()和rabbitmq交互
go get github.com/rabbitmq/amqp091-go
收发模式2示例:
package mainimport ("fmt""github.com/streadway/amqp""time"
)func main() {conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")if err != nil {panic(err)}ch, err := conn.Channel()if err != nil {panic(err)}//durable 服务器重启还有queue autoDelete 自动删除 exclusive 独占连接,这个q别人连不上 noWait 是否等待返回的一些状态结果//关于queue的一些设置q, err := ch.QueueDeclare("go_q1", true, false, false, false, nil)if err != nil {panic(err)}// 开启消费者go consume("c1",conn, q.Name)go consume("c2",conn, q.Name)i := 0for {i++err := ch.Publish("", q.Name, false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("message %d", i)),})if err != nil {panic(err)}time.Sleep(200 * time.Millisecond)}
}func consume(name string,conn *amqp.Connection, q string) {ch, err := conn.Channel()if err != nil {panic(err)}msgs, err := ch.Consume(q,name,true, false,false,false,nil)if err != nil {panic(err)}for msg := range msgs {fmt.Printf("%s:%s\n",name,msg.Body)}
}
fanout模式示例:
package mainimport ("fmt""github.com/streadway/amqp""time"
)func main() {conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")if err != nil {panic(err)}ch, err := conn.Channel()if err != nil {panic(err)}err = ch.ExchangeDeclare("ex","fanout",true,false,false,false,nil)if err != nil {panic(err)}go subscribe(conn,"ex")go subscribe(conn,"ex")i := 0for {i++err := ch.Publish("ex", "", false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("message %d", i)),})if err != nil {panic(err)}time.Sleep(200 * time.Millisecond)}
}func subscribe(conn *amqp.Connection, ex string) {ch, err := conn.Channel()if err != nil {panic(err)}defer ch.Close()q, err := ch.QueueDeclare("", false, true, false, false, nil)if err != nil {panic(err)}defer ch.QueueDelete(q.Name, false,false,false)err = ch.QueueBind(q.Name,"",ex,false,nil)if err != nil {panic(err)}consume("c3",ch,q.Name)}func consume(name string,ch *amqp.Channel, q string) {msgs, err := ch.Consume(q,name,true, false,false,false,nil)if err != nil {panic(err)}for msg := range msgs {fmt.Printf("%s:%s\n",name,msg.Body)}
}
写代码的时候注意,收发消息,一定要在不同的channel进行,大家可以把channel认为是一个tcp连接的分割。建立exchang的channel可以进行发消息,不可以进行收消息
可以看到有一个exchange,对应2个queue。对应一条tcp连接(分成3个channel,1个向exchange发,2个从queue收)
routing(路由)模式示例
package mainimport ("fmt""github.com/streadway/amqp""strconv""time"
)const (exchangeName = "ex_routing"key1 = "key1"key2 = "key2"queueBindKey1 = "queue1"queueBindKey2 = "queue2"
)func main() {dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxxx", "xxxxx", "xxxxx", "xxxxx")conn, err := amqp.Dial(dsn)if err != nil {panic(err)}ch, err := conn.Channel()if err != nil {panic(err)}InitMQ(ch,queueBindKey1,key1,exchangeName)InitMQ(ch,queueBindKey2,key2,exchangeName)go subscribe(conn, key1,queueBindKey1)go subscribe(conn, key2,queueBindKey2)i := 0for {i++sendMessage(ch,exchangeName,key1,strconv.Itoa(i))sendMessage(ch,exchangeName,key2,strconv.Itoa(i))time.Sleep(500 * time.Millisecond)}
}func InitMQ(ch *amqp.Channel, queue,key,exchange string) {// 声明 exchangeerr := ch.ExchangeDeclare(exchangeName, "direct", true, false, false, false, nil)if err != nil {panic(err)}// 声明 queue_, err = ch.QueueDeclare(queue, false, false, false, false, nil)if err != nil {panic(err)}// 将 queue 与 exchange 和 key 绑定err = ch.QueueBind(queue, key, exchange, false, nil)if err != nil {panic(err)}}func sendMessage(ch *amqp.Channel, exchange string, key string,message string) {err := ch.Publish(exchange, key, false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("send to %s, message: %v", key,message)),})if err != nil {panic(err)}}func subscribe(conn *amqp.Connection, key string,queue string) {ch, err := conn.Channel()if err != nil {panic(err)}defer ch.Close()key = fmt.Sprintf("%s haha",key)consume(key, ch, queue)
}func consume(name string, ch *amqp.Channel, queue string) {msgs, err := ch.Consume(queue, name, true, false, false, false, nil)if err != nil {panic(err)}for msg := range msgs {fmt.Printf("%s:%s\n", name, msg.Body)}
}
绑定图:
topic模式
是rabbitmq最高级模式了,没啥说的,重点就是,*
匹配1个
,#
匹配0或多个
package mainimport ("fmt""github.com/streadway/amqp""log""time"
)const (TopicExchange = "topicExchange"BindingKey1 = "*.*.red"BindingKey2 = "*.error.*"BindingKey3 = "shanghai.*.*"Queue1 = "queue1"Queue2 = "queue2"Queue3 = "queue3"RoutingKey1 = "beijing.error"RoutingKey2 = "shanghai.fatal.red"
)func main() {dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "用户名", "密码", "ip", port)conn, err := amqp.Dial(dsn)if err != nil {panic(err)}ch, err := conn.Channel()if err != nil {panic(err)}InitMQ(ch, Queue1, BindingKey1, TopicExchange)InitMQ(ch, Queue2, BindingKey2, TopicExchange)InitMQ(ch, Queue3, BindingKey3, TopicExchange)ch2 := GenChannel(conn)go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))}})go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))}})go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))}})for {sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")time.Sleep(500 * time.Millisecond)}
}func GenChannel(conn *amqp.Connection) *amqp.Channel {ch, err := conn.Channel()if err != nil {log.Fatal(err)}return ch
}func InitMQ(ch *amqp.Channel, queue, key, exchange string) {// 声明 exchangeerr := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)if err != nil {panic(err)}// 声明 queue_, err = ch.QueueDeclare(queue, false, false, false, false, nil)if err != nil {panic(err)}// 将 queue 与 exchange 和 key 绑定err = ch.QueueBind(queue, key, exchange, false, nil)if err != nil {panic(err)}}func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {err := ch.Publish(exchange, key, false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),})if err != nil {panic(err)}}func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {msgs, err := ch.Consume(queue, key, true, false, false, false, nil)if err != nil {panic(err)}callback(msgs, key)
}
高级操作
消费者确认模式:
将消费消息,设置为手动确认:
成功时确认:msg.Ack(false)
失败时消息处理方式:
不进行确认,会进入unacked,当消费者重启后,或者同一队列的其他消费者可以消费
重新入列
msg.Reject(true)
- 丢弃
msg.Reject(false)
package mainimport ("fmt""github.com/streadway/amqp""log""time"
)const (TopicExchange = "topicExchange"BindingKey1 = "*.*.red"BindingKey2 = "*.error.*"BindingKey3 = "shanghai.*.*"Queue1 = "queue1"Queue2 = "queue2"Queue3 = "queue3"RoutingKey1 = "beijing.error"RoutingKey2 = "shanghai.fatal.red"
)func main() {dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxxx", "xxxxx", "xxxxx", "xxxxx")conn, err := amqp.Dial(dsn)if err != nil {panic(err)}ch, err := conn.Channel()if err != nil {panic(err)}InitMQ(ch, Queue1, BindingKey1, TopicExchange)InitMQ(ch, Queue2, BindingKey2, TopicExchange)InitMQ(ch, Queue3, BindingKey3, TopicExchange)ch2 := GenChannel(conn)go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))// false 拒绝重新入列,即丢弃//msg.Reject(false)// true 重新入列msg.Reject(true)}})go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))msg.Ack(false)}})go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))msg.Ack(false)}})cycleCount := 1for i:=0;i<cycleCount;i++ {sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")time.Sleep(500 * time.Millisecond)}select {}
}func GenChannel(conn *amqp.Connection) *amqp.Channel {ch, err := conn.Channel()if err != nil {log.Fatal(err)}return ch
}func InitMQ(ch *amqp.Channel, queue, key, exchange string) {// 声明 exchangeerr := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)if err != nil {panic(err)}// 声明 queue_, err = ch.QueueDeclare(queue, false, false, false, false, nil)if err != nil {panic(err)}// 将 queue 与 exchange 和 key 绑定err = ch.QueueBind(queue, key, exchange, false, nil)if err != nil {panic(err)}}func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {err := ch.Publish(exchange, key, false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),})if err != nil {panic(err)}}func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {msgs, err := ch.Consume(queue, key, false, false, false, false, nil)if err != nil {panic(err)}callback(msgs, key)
}
消费限流
限制未ack的最多有5个,必须设置为手动ack才有效
示例:
package mainimport ("fmt""github.com/streadway/amqp""log""time"
)const (TopicExchange = "topicExchange"BindingKey1 = "*.*.red"BindingKey2 = "*.error.*"BindingKey3 = "shanghai.*.*"Queue1 = "queue1"Queue2 = "queue2"Queue3 = "queue3"RoutingKey1 = "beijing.error"RoutingKey2 = "shanghai.fatal.red"
)func main() {dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxx", "xxxx", "xxxx", xxxx)conn, err := amqp.Dial(dsn)if err != nil {panic(err)}ch, err := conn.Channel()if err != nil {panic(err)}InitMQ(ch, Queue1, BindingKey1, TopicExchange)InitMQ(ch, Queue2, BindingKey2, TopicExchange)InitMQ(ch, Queue3, BindingKey3, TopicExchange)ch2 := GenChannel(conn)// 限制未ack的最多有5个,必须设置为手动ack才有效ch2.Qos(5,0,false)go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {go func(msg amqp.Delivery) {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))time.Sleep(time.Second * 5)msg.Ack(false)}(msg)}})go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))msg.Ack(false)}})go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))msg.Ack(false)}})cycleCount := 10for i:=0;i<cycleCount;i++ {sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")time.Sleep(500 * time.Millisecond)}select {}
}func GenChannel(conn *amqp.Connection) *amqp.Channel {ch, err := conn.Channel()if err != nil {log.Fatal(err)}return ch
}func InitMQ(ch *amqp.Channel, queue, key, exchange string) {// 声明 exchangeerr := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)if err != nil {panic(err)}// 声明 queue_, err = ch.QueueDeclare(queue, false, false, false, false, nil)if err != nil {panic(err)}// 将 queue 与 exchange 和 key 绑定err = ch.QueueBind(queue, key, exchange, false, nil)if err != nil {panic(err)}}func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {err := ch.Publish(exchange, key, false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),})if err != nil {panic(err)}}func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {msgs, err := ch.Consume(queue, key, false, false, false, false, nil)if err != nil {panic(err)}callback(msgs, key)
}
延迟消息
借助rabbitmq-delayed-message-exchange
插件实现(需要先安装好)
package mainimport ("fmt""github.com/streadway/amqp""log""time"
)const (TopicExchange = "topicExchange"DelayExchange = "delayExchange"BindingKey1 = "*.*.red"BindingKey2 = "*.error.#"BindingKey3 = "shanghai.*.*"Queue1 = "queue1"Queue2 = "queue2"Queue3 = "queue3"RoutingKey1 = "beijing.error"RoutingKey2 = "shanghai.fatal.red"
)func main() {dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxx", "xxxx", "xxxx", 5672)conn, err := amqp.Dial(dsn)if err != nil {panic(err)}ch, err := conn.Channel()if err != nil {panic(err)}InitMQ(ch, Queue1, BindingKey1, TopicExchange)InitMQ(ch, Queue2, BindingKey2, TopicExchange)InitMQ(ch, Queue3, BindingKey3, TopicExchange)InitDelayMQ(ch, Queue2, "", DelayExchange)ch2 := GenChannel(conn)// 限制未ack的最多有5个,必须设置为手动ack才有效ch2.Qos(5, 0, false)go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {go func(msg amqp.Delivery) {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1, BindingKey1, string(msg.Body))msg.Ack(false)}(msg)}})go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Println(time.Now())fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2, BindingKey2, string(msg.Body))isFail := true// 如果失败发送延迟消息给if isFail {delay,ok := msg.Headers["x-delay"].(int32)if ok {delay = delay * 2fmt.Println(delay)}else{delay = 1000}sendDelayMessage(ch, DelayExchange, "", string(msg.Body), int(delay))msg.Reject(false)} else {msg.Ack(false)}}})go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {for msg := range msgs {fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3, BindingKey3, string(msg.Body))msg.Ack(false)}})// 设置confirm,发送端消息确认//var notifyConfirm chan amqp.Confirmation//SetConfirm(ch, notifyConfirm)//go ListenConfirm(notifyConfirm)//var notifyReturn chan amqp.Return//NotifyReturn(notifyReturn,ch)//go ListReturn(notifyReturn)cycleCount := 1for i := 0; i < cycleCount; i++ {fmt.Println(i)//sendDelayMessage(ch, DelayExchange, "", "beijing.error-----------------",3000)sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")//sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")time.Sleep(500 * time.Millisecond)}select {}
}func GenChannel(conn *amqp.Connection) *amqp.Channel {ch, err := conn.Channel()if err != nil {log.Fatal(err)}return ch
}func InitMQ(ch *amqp.Channel, queue, key, exchange string) {// 声明 exchangeerr := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)if err != nil {panic(err)}// 声明 queue_, err = ch.QueueDeclare(queue, false, false, false, false, nil)if err != nil {panic(err)}// 将 queue 与 exchange 和 key 绑定err = ch.QueueBind(queue, key, exchange, false, nil)if err != nil {panic(err)}}func InitDelayMQ(ch *amqp.Channel, queue, key, exchange string) {//申明交换机err := ch.ExchangeDeclare(exchange, "x-delayed-message",false, false, false, false,map[string]interface{}{"x-delayed-type": "direct"})if err != nil {log.Fatal(err)}// 声明 queue_, err = ch.QueueDeclare(queue, false, false, false, false, nil)if err != nil {panic(err)}// 将 queue 与 exchange 和 key 绑定err = ch.QueueBind(queue, key, exchange, false, nil)if err != nil {panic(err)}
}func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {err := ch.Publish(exchange, key, true, false, amqp.Publishing{Body: []byte(fmt.Sprintf("%v", message)),})if err != nil {panic(err)}}func sendDelayMessage(ch *amqp.Channel, exchange string, key string, message string, delay int) {err := ch.Publish(exchange, key, true, false, amqp.Publishing{Headers: map[string]interface{}{"x-delay": delay},Body: []byte(fmt.Sprintf("%v", message)),})if err != nil {panic(err)}}func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {msgs, err := ch.Consume(queue, key, false, false, false, false, nil)if err != nil {panic(err)}callback(msgs, key)
}func SetConfirm(ch *amqp.Channel, notifyConfirm chan amqp.Confirmation) {err := ch.Confirm(false)if err != nil {log.Println(err)}notifyConfirm = ch.NotifyPublish(make(chan amqp.Confirmation))
}func ListenConfirm(notifyConfirm chan amqp.Confirmation) {for ret := range notifyConfirm {if ret.Ack {fmt.Println("消息发送成功")} else {fmt.Println("消息发送失败")}}
}func NotifyReturn(notifyReturn chan amqp.Return, channel *amqp.Channel) {notifyReturn = channel.NotifyReturn(make(chan amqp.Return))
}
func ListReturn(notifyReturn chan amqp.Return) {ret := <-notifyReturnif string(ret.Body) != "" {fmt.Println("消息没有投递到队列:", string(ret.Body))panic("skfh")}
}
持久化
交换机持久化:
交换机持久化是指将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或执行代码去创建交换机了,交换机会自动被创建,相当于一直存在。
队列持久化
如果不将队列设置为持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,数据也会丢失。队列都没有了,消息也找不到地方存储了。
消息持久化
RabbitMQ 的消息是依附于队列存在的,所以要想消息持久化,那么前提是队列也必须设置持久化。
在创建消息的时候,添加一个持久化消息的属性(将 delivery_mode 设置为 2)。
设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧会存在;
仅设置队列持久化,重启之后消息会丢失;
仅设置消息持久化,重启之后队列会消失,因此消息也就丢失了,所以只设置消息持久化而不设置队列持久化是没有意义的;
将所有的消息都设置为持久化(写入磁盘的速度比写入内存的速度慢的多),可能会影响 RabbitMQ 的性能,对于可靠性不是那么高的消息可以不采用持久化来提高 RabbitMQ 的吞吐量。
golang之消息队列rabbitmq相关推荐
- 初识消息队列/RabbitMQ详解
欢迎大家阅读<朝夕Net社区技术专刊> 我们致力于.NetCore的推广和落地,为更好的帮助大家学习,方便分享干货,特创此刊!很高兴你能成为忠实读者,文末福利不要错过哦! 今天来给大家分享 ...
- 消息队列RabbitMQ的使用
最近在学习spring cloud微服务,当学习到spring cloud bus的时候,涉及到了消息队列,需要学习RabbitMQ. 一.消息队列 1.1介绍消息队列 消息队列,即MQ,Messag ...
- 消息队列RabbitMQ入门与PHP实战
消息队列介绍以及消息队列应用场景 RabbitMQ 说明 MQ(Message Queue) 即消息队列,是应用间的通信方式,消息发送后可立即返回,由消息系统来确保消息的可靠传递."消息队列 ...
- 快速掌握消息队列RabbitMQ
※快速掌握消息队列RabbitMQ 一.RabbitMQ概述 (一)什么是消息队列MQ 消息队列(Message Queue),后文称MQ,是一种 跨进程的通信机制,用于上下游传递消息. MQ作为消息 ...
- 谷粒商城12——购物车模块、消息队列RabbitMQ
文章目录 十.购物车模块 1.需求分析 2.封装vo 3.添加商品 4.查询购物车 5.选中商品 6.在购物车修改商品数量 7.在购物车删除商品 十一.消息队列RabbitMQ 1.场景分析 2.概述 ...
- RabbitMQ总结(一)--消息队列RabbitMQ应答模式(自动、手动)
原文链接 消息队列RabbitMQ应答模式(自动.手动) 为了确保消息不会丢失,RabbitMQ支持消息应答.消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了.RabbitM ...
- 消息队列RabbitMQ之初学者
文章目录 消息队列 什么是消息队列 生产者和消费者 AMQP和JMS AMQP和JMS的区别 常见的MQ产品 RabbitMQ Erlang语言 RabbitMQ下载 什么是消息队列RabbitMQ? ...
- SpringCloud源码探析(六)-消息队列RabbitMQ
1.概述 RabbitMQ是一个开源的消息代理和队列服务器,它是基于Erlang语言开发,并且是基于AMQP协议的.由于Erlang语言最初使用与交换机领域架构,因此使得RabbitMQ在Broker ...
- 消息队列 RabbitMQ
前言 市面上的消息队列产品有很多,比如老牌的 ActiveMQ.RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,阿里巴巴捐赠给 Apache 的 RocketMQ ,连 red ...
最新文章
- [转]EXP-00056: 遇到 ORACLE 错误 31600
- 走向公共管理的治理理论
- 《大话数据结构》第9章 排序 9.10 总结回顾
- mysql系列之2.mysql多实例
- redis+aop防重复提交
- Myeclipse中Tomcat的两种部署方式
- 构建自己的PHP框架(ORM)
- Java基础---方法1
- 图解linux32位平台下进程线程长什么样子
- FINALDATA 使用教程
- 软件测试——开发模型、测试模型介绍
- 总会用到的系列2:你不理财财不理你的基金定投
- 中国银联在线支付接口开发——前台支付
- 上交计算机考研专业课,2018上交考研经验贴
- SVM——(三)对偶性和KKT条件(Lagrange duality and KKT condition)
- DVWA-XSS(Reflected) 全级别教程
- 在美国,男 / 女卫生间(厕所)的正确称呼为什么?请用英语写出答案。
- 文件复制-字节输入输出流的使用
- python已知两边求第三边_已知两边求第三边公式
- GIS标准分幅工具——制作图幅接合表