Go 接入RabbitMq实操

基本概念

什么是消息队列

消息队列是一种应用(进程)间的通信方式。

生产者只需把消息发布到MQ,消费者只需重MQ中取出,可靠传递由消息队列中的消息系统来确保。


消息队列有什么用

消息队列是一种异步协作机制,最根本的用处在于将一些不需要即时生效的操作拆分出来异步执行,从而达到可靠传递、流量削峰等目的。

比如如果有一个业务需要发送短信,可以在主流程完成之后发送消息到MQ后,让主流程完结。而由另外的线程拉取MQ的消息,完成发送短信的操作。


常用的消息队列

常用的MQ大概有ActiveMQ、RabbitMQ、RocketMQ、Kafka


ActiveMQ,基于Java

优点:对Java的JMS支持最好;多线程并发;

缺点:历史悠久,版本更新慢。现在慢慢用的少了;


RabbitMQ,基于Erlang

优点:生态丰富,是现在主流的MQ;支持多种客户端、支持AJAX;

缺点:对想深入源码的Java选手不太友好;


RocketMQ,基于Java

优点:为海量数据打造;主张拉模式;天然集群、HA、负载均衡;

缺点:生态较小


Kafka,基于Scala

优点:分布式高可拓展;高性能;容错强

缺点:消息重复;乱序;维护成本高


什么是RabbitMQ

消息中间件

erlang:一种并发函数式语言

AMQP:Advanced Message Queuing Protocol,高级消息队列协议。由Exchange、Queue和Bind组成

RabbitMQ是一个erlang开发的AMQP实现

生产者将消息发送到Exchange上,通过Exchange从而Binding到Queues上。

Exchange有三种具体类型:

direct:如果消息中的RoutingKey和Binding中的BindingKey一致就转发

fanout:消息被分发到所有队列中

topic:将RoutingKey和队列的模式进行匹配

应用场景

异步

可以理解为将遇到非必须的业务时,立即响应客户端,不关系业务何时完成

比如在用户注册时,有将信息写入数据库和发送注册成功邮件两项业务。

数据库写入完成即标志着用户注册成功,此时如果继续处理发送邮件的业务,会给客户端带来不必要的等待时间。引入消息队列后,在队列中写入完成注册的消息后,即可完成整个注册流程。至于邮件,可以等到邮件业务从消息队列中取出消息再发送。

把不紧急的业务从主线中剥离出来,主线不必考虑不紧急的业务何时完成的时候,可以考虑使用消息队列实现异步。


解耦

考虑两个系统间存在消息传递,一个系统的故障会影响到整个业务的正常运转。可以用消息队列来保证消息可靠传递

比如一个订单系统和一个库存系统,完成订单之后,需要进行库存调度。考虑到如果库存系统故障,会引起已完成的订单消息的丢失,而做很多异常处理会使业务变得臃肿。这个时候可考虑引入消息队列,使用消息队列保证可靠传输,从而减少业务逻辑。


削峰

考虑短时间的大量请求,可能会带来内存溢出、大面积连接超时等情况,使得服务器崩溃。引入消息队列后,可以控制请求到业务处理系统的流量,从而防止崩溃现象的出现。

比如秒杀场景。大量请求同时涌入,服务器不能分配足够的资源响应,或者带宽不足,导致宕机。可以引入消息队列来限流,MQ通过限制同一时间的出口消息,使得流量在服务器能够承受的范围之内。等待一部分请求处理完成之后,再向业务处理系统导入新的消息。

----------------------------------------------------------------------------------------------------------------------------------------

昨天我们用docker在虚拟机上装了RabbitMq,今天我们就开始用它来实际操作一下。

不说废话了,我们开始搞,我这里用的是Go语言。

首先,我们先封装方法:

package rabbitmq

import (
	"fmt"

	"github.com/streadway/amqp"
)

type RabbitMq struct {
	Conn         *amqp.Connection
	Ch           *amqp.Channel
	QueueName    string // 队列名称
	ExchangeName string // 交换机名称
	ExchangeType string // 交换机类型
	RoutingKey   string // routingKey
}

type QueueAndExchange struct {
	QueueName    string // 队列名称
	ExchangeName string // 交换机名称
	ExchangeType string // 交换机类型
	RoutingKey   string // routingKey
}

func (r *RabbitMq) ConnMq() {
	conn, err := amqp.Dial("amqp://admin:123456@192.168.11.66:5672/my_vhost")
	if err != nil {
		fmt.Printf("连接mq出错,错误信息为:%v\n", err)
		return
	}
	r.Conn = conn

}

func (r *RabbitMq) CloseMq() {
	err := r.Conn.Close()
	if err != nil {
		fmt.Printf("关闭连接出错,错误信息为:%v\n", err)
		return
	}

}

// 开启channel通道
func (r *RabbitMq) OpenChan() {
	ch, err := r.Conn.Channel()
	if err != nil {
		fmt.Printf("开启channel通道出错,错误信息为:%v\n", err)
		return
	}
	r.Ch = ch
}

// 关闭channnel通道
func (r *RabbitMq) CloseChan() {
	err := r.Ch.Close()
	if err != nil {
		fmt.Printf("关闭channel通道出错,错误信息为:%v\n", err)
	}
}

// 生产者
func (r *RabbitMq) PublishMsg(body string) {

	ch := r.Ch
	// 创建队列
	ch.QueueDeclare(r.QueueName, true, false, false, false, nil)

	// 创建交换机
	ch.ExchangeDeclare(r.ExchangeName, r.ExchangeType, true, false, false, false, nil)

	// 队列绑定交换机
	ch.QueueBind(r.QueueName, r.RoutingKey, r.ExchangeName, false, nil)

	// 生产任务
	ch.Publish(r.ExchangeName, r.RoutingKey, false, false, amqp.Publishing{
		ContentType:  "text/plain",
		Body:         []byte(body),
		DeliveryMode: amqp.Persistent,
	})
}

// 创建实例
func NewRabbitMq(qe QueueAndExchange) RabbitMq {
	return RabbitMq{
		QueueName:    qe.QueueName,
		ExchangeName: qe.ExchangeName,
		ExchangeType: qe.ExchangeType,
		RoutingKey:   qe.RoutingKey,
	}
}


接下来我们就我们创建的生产者发送消息:

package main

import (
	"test_rabbitmq/rabbitmq"
	"test_rabbitmq/utils"
)

func main() {

	qe := rabbitmq.QueueAndExchange{
		QueueName:    "test_queue",
		ExchangeName: "test_exchange",
		ExchangeType: "direct",
		RoutingKey:   "test_routingKey",
	}
	mq := rabbitmq.NewRabbitMq(qe)

	mq.ConnMq()
	mq.OpenChan()

	defer func() {
		mq.CloseMq()
	}()
	defer func() {
		mq.CloseChan()
	}()

	test_map := map[string]interface{}{
		"mail": "9527@qq.com",
		"msg":  "今天大太阳",
	}

	//这里我们发送100条消息
	for i := 0; i < 100; i++ {
		mq.PublishMsg(utils.MapToStr(test_map))
	}

}

我们可以看到,发送的消息已经在队列当中了。


然后我们开始消费消息:

package main

import (
	"fmt"
	"net/http"

	"github.com/gin-gonic/gin"
	"github.com/streadway/amqp"
)

func main() {
	router := gin.Default()
	router.GET("/", func(c *gin.Context) {
		c.String(http.StatusOK, "Hello World")
	})

	router.Run(":8081")
}

func init() {

	conn, err := amqp.Dial("amqp://admin:123456@192.168.11.66:5672/my_vhost")
	fmt.Println(err)
	defer conn.Close()

	ch, err_ch := conn.Channel()
	fmt.Println(err_ch)
	defer ch.Close()

	ch.Qos(1, 0, false)

	deliveries, err := ch.Consume("test_queue", "consumer", false, false, false, false, nil)
	if err != nil {
		fmt.Println(err)
	}
	
	//消费成功delivery.Ack(true)

	for delivery := range deliveries {
		delivery.Ack(true)
		body := string(delivery.Body)
		fmt.Println(body)
		fmt.Printf("%T\n", body)
	}
}


然后我们可以看到数据已经取出来了,队列的消息也已经消费了。

点赞5
点击评论0
收藏2
浏览 79
 

还没有评论,快来发表第一个评论吧

免责声明:凡在本网站出现的信息,均仅供参考。本网站将尽力确保所提供信息的准确性及可靠性,但不保证有关资料的准确性及可靠性,注册用户和一般页面游览者在使用前请进一步核实,并对任何自主决定的行为负责。本网站对有关资料所引致的错误、不确或遗漏,概不负任何法律责任(包括侵权责任、合同责任和其它责任)
*尊重作者,转载请注明出处!

创作内容

开启你的爱凌峰创作之旅

发布首篇内容,开通创作中心
快来成为爱凌峰创作者吧~

写文章

板块热门【Golang】