Go 接入RabbitMq实操
基本概念
什么是消息队列
消息队列是一种应用(进程)间的通信方式。
生产者只需把消息发布到MQ,消费者只需重MQ中取出,可靠传递由消息队列中的消息系统来确保。
消息队列有什么用
消息队列是一种异步协作机制,最根本的用处在于将一些不需要即时生效的操作拆分出来异步执行,从而达到可靠传递、流量削峰等目的。
比如如果有一个业务需要发送短信,可以在主流程完成之后发送消息到MQ后,让主流程完结。而由另外的线程拉取MQ的消息,完成发送短信的操作。
常用的消息队列
常用的MQ大概有ActiveMQ、RabbitMQ、RocketMQ、Kafka
ActiveMQ,基于Java
优点:对Java的JMS支持最好;多线程并发;
缺点:历史悠久,版本更新慢。现在慢慢用的少了;
RabbitMQ,基于Erlang
优点:生态丰富,是现在主流的MQ;支持多种客户端、支持AJAX;
缺点:对想深入源码的Java选手不太友好;
RocketMQ,基于Java
优点:为海量数据打造;主张拉模式;天然集群、HA、负载均衡;
缺点:生态较小
Kafka,基于Scala
优点:分布式高可拓展;高性能;容错强
缺点:消息重复;乱序;维护成本高
什么是RabbitMQ
消息中间件
erlang:一种并发函数式语言
AMQP:Advanced Message Queuing Protocol,高级消息队列协议。由Exchange、Queue和Bind组成
RabbitMQ是一个erlang开发的AMQP实现
生产者将消息发送到Exchange上,通过Exchange从而Binding到Queues上。
Exchange有三种具体类型:
direct:如果消息中的RoutingKey和Binding中的BindingKey一致就转发
fanout:消息被分发到所有队列中
topic:将RoutingKey和队列的模式进行匹配
应用场景
异步
可以理解为将遇到非必须的业务时,立即响应客户端,不关系业务何时完成
比如在用户注册时,有将信息写入数据库和发送注册成功邮件两项业务。
数据库写入完成即标志着用户注册成功,此时如果继续处理发送邮件的业务,会给客户端带来不必要的等待时间。引入消息队列后,在队列中写入完成注册的消息后,即可完成整个注册流程。至于邮件,可以等到邮件业务从消息队列中取出消息再发送。
把不紧急的业务从主线中剥离出来,主线不必考虑不紧急的业务何时完成的时候,可以考虑使用消息队列实现异步。
解耦
考虑两个系统间存在消息传递,一个系统的故障会影响到整个业务的正常运转。可以用消息队列来保证消息可靠传递
比如一个订单系统和一个库存系统,完成订单之后,需要进行库存调度。考虑到如果库存系统故障,会引起已完成的订单消息的丢失,而做很多异常处理会使业务变得臃肿。这个时候可考虑引入消息队列,使用消息队列保证可靠传输,从而减少业务逻辑。
削峰
考虑短时间的大量请求,可能会带来内存溢出、大面积连接超时等情况,使得服务器崩溃。引入消息队列后,可以控制请求到业务处理系统的流量,从而防止崩溃现象的出现。
比如秒杀场景。大量请求同时涌入,服务器不能分配足够的资源响应,或者带宽不足,导致宕机。可以引入消息队列来限流,MQ通过限制同一时间的出口消息,使得流量在服务器能够承受的范围之内。等待一部分请求处理完成之后,再向业务处理系统导入新的消息。
----------------------------------------------------------------------------------------------------------------------------------------
昨天我们用docker在虚拟机上装了RabbitMq,今天我们就开始用它来实际操作一下。
不说废话了,我们开始搞,我这里用的是Go语言。
首先,我们先封装方法:
package rabbitmq
import (
"fmt"
"github.com/streadway/amqp"
)
type RabbitMq struct {
Conn *amqp.Connection
Ch *amqp.Channel
QueueName string // 队列名称
ExchangeName string // 交换机名称
ExchangeType string // 交换机类型
RoutingKey string // routingKey
}
type QueueAndExchange struct {
QueueName string // 队列名称
ExchangeName string // 交换机名称
ExchangeType string // 交换机类型
RoutingKey string // routingKey
}
func (r *RabbitMq) ConnMq() {
conn, err := amqp.Dial("amqp://admin:123456@192.168.11.66:5672/my_vhost")
if err != nil {
fmt.Printf("连接mq出错,错误信息为:%v\n", err)
return
}
r.Conn = conn
}
func (r *RabbitMq) CloseMq() {
err := r.Conn.Close()
if err != nil {
fmt.Printf("关闭连接出错,错误信息为:%v\n", err)
return
}
}
// 开启channel通道
func (r *RabbitMq) OpenChan() {
ch, err := r.Conn.Channel()
if err != nil {
fmt.Printf("开启channel通道出错,错误信息为:%v\n", err)
return
}
r.Ch = ch
}
// 关闭channnel通道
func (r *RabbitMq) CloseChan() {
err := r.Ch.Close()
if err != nil {
fmt.Printf("关闭channel通道出错,错误信息为:%v\n", err)
}
}
// 生产者
func (r *RabbitMq) PublishMsg(body string) {
ch := r.Ch
// 创建队列
ch.QueueDeclare(r.QueueName, true, false, false, false, nil)
// 创建交换机
ch.ExchangeDeclare(r.ExchangeName, r.ExchangeType, true, false, false, false, nil)
// 队列绑定交换机
ch.QueueBind(r.QueueName, r.RoutingKey, r.ExchangeName, false, nil)
// 生产任务
ch.Publish(r.ExchangeName, r.RoutingKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
DeliveryMode: amqp.Persistent,
})
}
// 创建实例
func NewRabbitMq(qe QueueAndExchange) RabbitMq {
return RabbitMq{
QueueName: qe.QueueName,
ExchangeName: qe.ExchangeName,
ExchangeType: qe.ExchangeType,
RoutingKey: qe.RoutingKey,
}
}
接下来我们就我们创建的生产者发送消息:
package main
import (
"test_rabbitmq/rabbitmq"
"test_rabbitmq/utils"
)
func main() {
qe := rabbitmq.QueueAndExchange{
QueueName: "test_queue",
ExchangeName: "test_exchange",
ExchangeType: "direct",
RoutingKey: "test_routingKey",
}
mq := rabbitmq.NewRabbitMq(qe)
mq.ConnMq()
mq.OpenChan()
defer func() {
mq.CloseMq()
}()
defer func() {
mq.CloseChan()
}()
test_map := map[string]interface{}{
"mail": "9527@qq.com",
"msg": "今天大太阳",
}
//这里我们发送100条消息
for i := 0; i < 100; i++ {
mq.PublishMsg(utils.MapToStr(test_map))
}
}
我们可以看到,发送的消息已经在队列当中了。
然后我们开始消费消息:
package main
import (
"fmt"
"net/http"
"github.com/gin-gonic/gin"
"github.com/streadway/amqp"
)
func main() {
router := gin.Default()
router.GET("/", func(c *gin.Context) {
c.String(http.StatusOK, "Hello World")
})
router.Run(":8081")
}
func init() {
conn, err := amqp.Dial("amqp://admin:123456@192.168.11.66:5672/my_vhost")
fmt.Println(err)
defer conn.Close()
ch, err_ch := conn.Channel()
fmt.Println(err_ch)
defer ch.Close()
ch.Qos(1, 0, false)
deliveries, err := ch.Consume("test_queue", "consumer", false, false, false, false, nil)
if err != nil {
fmt.Println(err)
}
//消费成功delivery.Ack(true)
for delivery := range deliveries {
delivery.Ack(true)
body := string(delivery.Body)
fmt.Println(body)
fmt.Printf("%T\n", body)
}
}
然后我们可以看到数据已经取出来了,队列的消息也已经消费了。
还没有评论,快来发表第一个评论吧