RabbitMQ

RabbitMQ的安装

RabbitMQ官网
系统版本

1
2
3
4
5
CaseZheng@VM_187_252_centos ~]$ uname -a
Linux VM_187_252_centos 3.10.0-693.17.1.el7.x86_64 #1 SMP Thu Jan 25 20:13:58 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

[CaseZheng@VM_187_252_centos ~]$ cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)

RabbitMQ基于Erlang,需要先安装Erlang环境

Erlang安装

Erlang的版本需要看下RabbitMQ的对应版本的要求
Erlang的下载页面

下载Erlang到本地

1
wget http://erlang.org/download/otp_src_21.0.tar.gz

解压并安装

1
2
3
4
tar zxvf otp_src_21.0.tar.gz
cd otp_src_21.0/
./configure
make && sudo make install

默认使用默认路径 如需更改需要在 ./configure 时指定 如:

1
./configure --prefix=/opt/erlang

输入erl有以下提示则为安装成功

1
2
3
4
[CaseZheng@VM_187_252_centos ~]$ erl
Erlang/OTP 21 [erts-10.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Eshell V10.0 (abort with ^G)
1>

RabbitMQ安装

下载RabbitMQ 这里安装rabbitmq的generic版本

1
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.7/rabbitmq-server-generic-unix-3.7.7.tar.xz

解压

1
tar xvJf rabbitmq-server-generic-unix-3.7.7.tar.xz

移动 rabbitmq_server-3.7.7 到 /usr/local/ 并重命名为rabbitmq

1
sudo mv rabbitmq_server-3.7.7 /usr/local/rabbitmq

给/etc/profile加入环境变量如下:

1
export PATH=$PATH:/usr/local/rabbitmq/sbin

使环境变量生效

1
source /etc/profile

启用rabbitmq网页管理插件(该插件实际已随rabbitmq安装,启用即可)

1
rabbitmq-plugins enable rabbitmq_management

启动rabbitmq

1
2
rabbitmq-server -detached
ERROR: epmd error for host bogon: timeout (timed out)

出现此报错信息是因为解析不了主机名 修改hosts 将主机名解析到127.0.0.1或者当前机器的内网地址

查看启动成功

1
2
3
4
5
[CaseZheng@VM_187_252_centos ~]$ netstat -tunlp | grep beam
tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN 31942/beam.smp
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 31942/beam.smp
tcp6 0 0 :::5672 :::* LISTEN 31942/beam.smp
udp 0 0 0.0.0.0:46173 0.0.0.0:* 31942/beam.smp

添加用户 设置用户组 授权

1
2
3
rabbitmqctl add_user root 123456
rabbitmqctl set_user_tags root administrator
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

可通过外网地址+15672端口号的形式访问web管理界面,需要开放15672端口

RabbitMQ介绍

RabbitMQ是由Erlang语言开发的AMQP的开源实现
AMQP(Advanced Message Queuing Protocol 高级消息队列协议)是一个开放的应用层协议,为面对消息的中间件而设计,基于此协议的客户端和消息中间件可传递消息.

RabbitMQ结构图

AMQP模型简介

1
2
3
4
5
                   |---------------------------------|
| |
生产者 ---publish--|-> Exchange ---Routes---> Queue -|--consumes---> 消费者
| |
|-------- Brokers(消息代理) ------|

生产者发布消息到交换机,交换机根据路由规则将消息转发到消息队列,消息队列从消息队列取数据消费.
生产者可以在发布消息时给消息指定各种消息属性.消息的属性有的会被消息代理(brokers)使用,有的则完全不透明,只能由被接收消息的应用所使用.
由于网络是不可靠的,而且消费者处理消息也可能失败,为了保证消息可靠的被处理掉,AMQP包含了消息确认机制,当消息从队列投递到消费者后,消费者需通知消息代理,进行确认,这个确认操作可以是自动的,也可以由处理消息的消费者执行.当消息确认机制启用时,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认信息.

交换机和交换机类型

交换机负责消息的路由,使用的路由算法由交换机和绑定规则决定.

交换机类型(Exchange Type) 预声明的默认名称
Direct exchange (Empty string) 或 amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match

交换机除类型外,在申明交换机时还有许多其它属性,主要的有:

  • Name
  • Durability 消息代理重启后,交换机是否存在
  • Auto-delete 在所有与之绑定的消息队列都完成对此交换机的使用后,删除它

交换机有两个状态:持久(durable)和暂存(transient),持久化的交换机在消息代理重启后依然存在,而暂存的交换机则不会.

默认交换机

默认交换机是一个由消息代理预先声明的名字为空字符串的amq.direct交换机

Direct exchange(直连交换器 路由键完全匹配)

direct exchange是根据消息携带的路由键将消息投递给对应队列的.direct exchange用来处理消息的单播路由

  1. 将一个队列绑定在某个交换机上,同时绑定一个路由键
  2. 当一个携带路由键R的消息被发送到直连交换机时,交换机会将该消息路由给绑定值同样为R的队列
    direct交换器当路由键完全匹配队列绑定键才路由消息到绑定的队列

Fanout exchange(扇形交换器 不理会路由键)

fanout exchange将消息路由给绑定在它身上的所有队列,不理会绑定的路由键.如果N个队列绑定在某个扇形交换机上,当有消息发送到此扇形交换机时,交换机将消息的拷贝发送给所有的N个队列.funout exchange用来处理消息的广播路由.

Topic exchange(路由键部分匹配)

topic exchange通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列.topic exchange经常用来实现各种分发/订阅模式及其变种.topic exchange用来实现消息的多播路由.
路由键部分匹配,路由键的约定如下:

  1. 路由键为一个句点号”.”分隔的字符串(被”.”分割的每一段独立的字符串称为一个单词),例如:”product.conf.del”, “price.conf.add”
  2. 绑定键和路由键都是”.”分隔的字符串
  3. 绑定键可以存在两种特殊字符”“和”#”,用于模糊匹配,”“用于匹配一个单词,”#”用于匹配多个单词(可以匹配零个)

Headers exchange

消息的路由操作如果涉及到多个属性,使用消息头比用路由键更容易表达,headers exchange使用多个消息属性代替路由键建立路由规则.通过判断消息头的值能否与指定的绑定相匹配来确定路由规则.

队列

队列(queue)存储着即将被应用消费掉的消息.
队列属性:

  1. Name
  2. Durable(消息代理重启后,队列依旧存在)
  3. Exclusive(只被一个连接使用,当连接关闭后队列即被删除)
  4. Auto-delete(当最后一个消费者退订后即被删除)
  5. Arguments

队列在声明(declare)后才能被使用.如果一个队列尚不存在,声明一个队列会创建它.如果声明的队列已经存在,并且属性完全相同.那么此次声明不会对原有队列产生任何影响.如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出

队列名称

队列名称可以指定,也可以由消息代理生成,队列名称是最多255字节的一个utf-8字符串.如果想让消息代理生成队列名,需要给消息队列的name参数赋值一个空字符串(在统一个通道的后续方法中可以使用空字符串表示之前生成的队列名称,因为通道可以默默记住消息代理最后一次生成的队列名称).
以”amq.”开始的队列名称被预留给消息代理内部使用.

队列持久化

持久化队列(Durable queues)会被存储在磁盘上,当消息代理重启时它依然存在,没有被持久话的队列被称为暂存队列(Transient queues).
持久化队列并不会使得路由到它的消息也持久化,只有持久化的消息才能在消息代理重启后重新恢复.

绑定

绑定是交换机将消息路由给队列所需遵循的规则.
AMQP的消息无法路由到队列,消息会被就地销毁或者返还给发布者.如何处理取决于发布者设置的消息属性

消费者

消费者有两种方式消费数据:

  1. 消息代理主动将消息投递给消费者(push API)
  2. 消费者根据需要主动获取消息(pull API)

消息确认

消息代理删除消息的时机:

  1. 当消息代理将消息发送给应用后立即删除(自动确认)
  2. 待应用发送一个确认回执后再删除(显式确认)

拒绝消息

消费者可以在处理某条消息后向消息代理表明如何处理该消息,销毁它或者重新放入队列.

消息属性

  1. Content type(内容类型)
  2. Content encoding(内容编码)
  3. Routing key(路由键)
  4. Delivery mode(persistent or not)投递模式(持久化或非持久化)
  5. Message priority(消息优先权)
  6. Message publishing timestamp(消息发布的时间戳)
  7. Expiration period(消息有效期)
  8. Publisher application id(发布应用ID)

连接

AMQP连接通常为长连接,AMQP是一个使用TCP提供可靠投递的应用层协议.AMQP使用认证机制并且提供TLS(SSL)保护.当一个应用不再需要连接到AMQP代理时,需要优雅的释放AMQP连接,而不是直接关闭TCP连接.

通道

一个应用可能会和AMQP代理建立多个连接,此时同时开启多个TCP连接不合适,会消耗过多的系统资源,AMQP提供通道来处理多连接,可以将通道理解为共享一个TCP的多个轻量化连接.在多线程/多进程应用中可为每个线程/进程开启一个通道,这些通道不能被线程/进程共享.
一个特定通道上的通讯和其它通道上的通信是完全隔离的,因此每个AMQP方法都需要携带一个通道号.

虚拟主机

为了在一个单独的AMQP代理上实现多个隔离环境(用户、用户组、交换机、队列等),AMPQ提供虚拟主机概念(vhosts),为AMQP实体提供完全隔离的环境.当连接建立时,AMQP客户端指定使用的虚拟主机.

AMQP可扩展

RabbitMQ c客户端的使用

c客户端编译安装

编译安装看git说明即可rabbitmq-c git网址

api说明

api可查看官方文档或头文件注释rabbitmq-c 官方文档

原始c接口的封装

rabbitmq-c 原始c接口的简单封装

参考文档