一 、简介

1. 概念

从整体上讲Rabbitmq就是一个生产者消费者的模型.

我们将中间的整个broker就当做是一个消息中间件的实体就可以了.

单从这个方面上讲,生产者发送消息到broker上面,然后消费者从broker之中获取数据,最终完成数据的通信任务.

(1). virtual host

在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

也就是说,我们只要知道一个vhost就是存储一堆的exchange和queue的存储空间就可以 了.

(2). exchange(交换机)

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

在Rabbitmq之中,现在一般可以划分成四种类型,这四种类型我们称为Rabbitmq最基本的类型。

  • Direct

    直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue

  • fanout

    广播是式交换器,不管消息的ROUTING_KEY设置为什么 (忽略routing_key),Exchange都会将消息转发给所有绑定的Queue。

  • topic

    主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock,* . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)

  • headers

    消息体的header匹配(ignore)

(3). 消息队列queues

消息队列就是最终存储消息的地方,也是整个消息中间件之中最为重要的一个地方,消息队列和消费者联系,决定消费者到底能够获取到什么样的消息.

多个消费者可以订阅一个队列的内容,但是队列会按照轮询的方式进行消息的发送,我们在后面有办法改变这样的行为.

只有队列创建了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列创建之前的消息放进来的,即在建立队列之前,发出的所有消息都被丢弃了。

(4). 联系

在整个Rabbitmq之中最为复杂的一个概念,路由键,绑定建和绑定的概念了.

  • routing key : 路由键

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。

    在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

    RabbitMQ为routing key设定的长度限制为255 bytes

  • Binding : 绑定

    所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系

  • Binding key: 绑定建

    在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。

    在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。

    binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

2. 消息的处理

两种方式:

  1. 一次性。用 $q->get([…]),不管取到取不到消息都会立即返回,一般情况下使用轮询处理消息队列就要用这种方式;
  2. 阻塞。用 $q->consum( callback, […] ) 程序会进入持续侦听状态,每收到一个消息就会调用callback指定的函数一次,直到某个callback函数返回FALSE才结束。

3. channel 和 connection的关系

rabbitmq的connections 展示的是一个程序的连接,这个程序和rabbitmq的connection只会有一条,channel是建立在connection连接之上的一个虚拟连接,就像一条马路的4行道,connection是马路,channel是马路上的一条行道,这个比喻也不是很恰当,channel是双向的.

4. exchange的类型举例

  1. direct类型:

    在上面的图中,描述了一个direct类型的exchange.

    对于direct类型的交换机,它只会讲消息交给路由键和消息队列一致的消息队列.

    比如,上面的图中定义了多个绑定,当发送:

    ​ 路由键为info的消息的时候,消息会被路由到队列2上面.

    ​ 路由键为waring的消息的时候,消息就会被路由到队列1和队列2上面.

    从这个上面,我们也能够知道发送一个消息我们需要制定一个交换机,然后制定一个路由键.然后交互机会根据自己的类型和消息中附带的路由键找到对应的消息队列,是否真正的发送需要和绑定建进行比对.

  2. fanout类型

    fanout类型是一个最直接的类型,只要消息到达这样的交互机,它什么都不管,只要有队列和我有联系,我就将消息交给他.

    如上图,一个消息到达了交换机,它上面都不管,直接将消息交给了有连接的队列之中.

  3. topic交换机

      这种交互机比较器之前的类型,就是多了一个模糊匹配的功能.

      在Rabbitmq之中,使用了#和*代表模糊查找的占位符.

      其中:#表示多个单词 ,*表示一个单词.

    我们看看上面图:

      当我们的消息的路由键的值为usa.news,那么消息可以成功了路由到队列1和队列2.

      当为usa.weather的时候,会路由到队列1队列3. 

5. 为什么使用信道channel

大家都知道,在使用rabbitmq时不管是消费还是生产都需要创建信道(channel) 和connection(连接),如下图producer示例。我们完全可以直接使用Connection就能完成信道的工作,为什么还要引入信道呢,试想这样一个场景,一个应用有多个线程需要从rabbitmq中消费,或是生产消息,那么必然会建立很多个connection ,也就是多个tcp连接,对操作系统而言,建立和销毁tcp连接是很昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现,rabbitmq采用类似nio的做法,连接tcp连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程都把持一个信道,所以信道复用了TCP连接。同时rabbitmq可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的connection可以再产生性能瓶颈的情况下有效地节省tcp连接资源,但是当信道本身的流量很大时,这时候多个信道复用一个connection就会产生性能瓶颈,进而是整体的流量被限制了。此时就需要开辟多个connection,将这些信道均摊到这些connection中,至于这些相关调优策略需要根据业务自身的实际情况进行调节。

6. 端口号解析

  • 4369 (epmd), 25672 (Erlang distribution)

    Epmd 是 Erlang Port Mapper Daemon 的缩写,在 Erlang 集群中相当于 dns 的作用,绑定在4369端口上。

  • 5672, 5671 (AMQP 0-9-1 without and with TLS)

    AMQP 是 Advanced Message Queuing Protocol 的缩写,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,专为面向消息的中间件设计。基于此协议的客户端与消息中间件之间可以传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。Erlang 中的实现RabbitMQ 等。

  • 15672 (if management plugin is enabled)

    通过 http://serverip:15672 访问 RabbitMQ 的 Web 管理界面,默认用户名密码都是 guest。(注意:RabbitMQ 3.0之前的版本默认端口是55672,下同)

  • 61613, 61614 (if STOMP is enabled)

    Stomp 是一个简单的消息文本协议,它的设计核心理念就是简单与可用性,官方文档,实践一下 Stomp 协议需要:

    一个支持 stomp 消息协议的 messaging server (譬如activemq,rabbitmq);
    一个终端(譬如linux shell);
    一些基本命令与操作(譬如nc,telnet)

  • 1883, 8883 (if MQTT is enabled)
    MQTT 只是 IBM 推出的一个消息协议,基于 TCP/IP 的。两个 App 端发送和接收消息需要中间人,这个中间人就是消息服务器(比如ActiveMQ/RabbitMQ),三者通信协议就是 MQTT.

7. 持久化

  1. RabbitMQ消息持久化分为消息持久化、队列持久化和交换器持久化,不管持久化还是非持久化消息都可以被写入磁盘,区别在于重启后消息是否还存在.

  2. 队列持久化是通过定义队列是durable参数指定的.

    交换器持久化和队列持久化一样,也是通过在定义时durable参数指定.

    消息持久化是通过消息的属性deliverymode来设置是否持久化

  3. RabbitMQ内存/磁盘控制:内存使用超过阈值后,Rabbit会暂时阻塞客户端的连接,停止接收消息,可以通过管理命令临时调整阈值

    rabbitmqctl set_vm_memery_high_watermark ,fraction为内存阈值,默认为0.4;

    或者通过改配置文件在/etc/rabbitmq/rabbitmq.conf添加 vm_memery_high_watermark.relative = 0.5(相对值) /absolute = 1GB (绝对值)

  4. 内存换页:在某个rabbitmq节点内存到达阈值之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会被转储到磁盘中,同时会将持久化的消息在内存中清除,内存换页阈值设置 vm_memery_high_watermark_paging_ratio=0.75

  5. 磁盘阈值:当磁盘剩余空间小于默认值时,RabbitMQ同样会阻塞生产者,默认值为50M,但是并不能完全消除RabbitMQ崩溃的可能性(因为磁盘扫描是有时间间隔的)。

管理命令 rabbitmq set_disk_free_limit <8GB> /mem_relative 2.0(与内存的相对比值)

配置文件 disk_free_limit.relative =2.0 /disk_free_limit.absolute = 50M

管理控制台描述:https://www.cnblogs.com/coder-programming/p/11382322.html

二、安装

1. docker单节点

docker run -d --hostname rabbit \
    --name rabbit \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=admin \
    -p 5672:5672 \
    -p 15672:15672 \
    -p 61613:61613 \
    repository.jcfc.cn:9445/rabbitmq:3.6.12-management

2. 单节点安装

  • 下载Erlang的rpm包

    RabbitMQ是Erlang语言编写,所以Erang环境必须要有,注:Erlang环境一定要与RabbitMQ版本匹配:https://www.rabbitmq.com/which-erlang.html

    下载地址:https://bintray.com/rabbitmq-erlang/rpm/erlang(根据自身需求及匹配关系,下载对应rpm包)

  • 下载RabbitMQ的rpm包

    https://bintray.com/rabbitmq/rpm/rabbitmq-server

  • 下载socat:

    http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

  • 分别安装Erlang、Socat、RabbitMQ(一定按照顺序!)

      rpm -ivh erlang-.*

  rpm -ivh socat-.*

  rpm -ivh rabbitmq-server-.*

  • 配置rabbitmq:vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

    改为 : {loopback_users,[]}

  • 安装管理插件:

    rabbitmq-plugins enable rabbitmq_management

  • 启动

默认用户:guest,guest

3. 集群安装

(1)集群搭建

  1. 配置环境host,各个节点的host都需要包含每个节点的信息,信息要一致
  2. 各台机器的rabbit-server安装好。
  3. 保证 /var/lib/rabbitmq/.erlang.cookie 和 $HOME/.erlang.cookie 各服务器文件内容一致,且权限600。
  4. 后台启动服务:systemctl start rabbitmq-server或者 rabbitmq-server -detached , 然后停止所有机器的rabbitmq应用:rabbitmqctl stop_app
  5. 在其中一台(认定为主)添加节点: rabbitmqctl join_cluster rabbit@${其他服务器名} //默认是磁盘节点,如果是内存节点的话,需要加–ram参数
  6. 启动所有机器的rabbitmq的服务:rabbitmqctl start_app
  7. 重新添加用户密码、并授权。

(2)集群模式

  • 普通模式

默认模式。以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。

  • 镜像模式

把需要的队列做成镜像队列,存在与多个节点属于RabbitMQ的HA方案该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

镜像模式要依赖policy模块。就是要设置,那些Exchanges或者queue的数据需要复制,同步。

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

参数意思为:

ha-all:为策略名称。

^:为匹配符,只有一个^代表匹配所有,^zlh为匹配名称为zlh的exchanges或者queue。

ha-mode:为匹配类型,他分为3种模式:all-所有(所有的queue),exctly-部分(需配置ha-params参数,此参数为int类型比如3,众多集群中的随机3台机器),nodes-指定(需配置ha-params参数,此参数为数组类型比如[“rabbit@F”,”rabbit@G”]这样指定为F与G这2台机器。)。

  • 其他插件实现高可用

可以借助插件 shovel、federation来实现高可用,但是不常用,下面是两种方式的区别

三 、配置优化

四、管理

1. 用户管理

用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。

  • 新增用户:
rabbitmqctl  add_user  Username  Password
  • 删除一个用户:
rabbitmqctl  delete_user  Username
  • 修改用户的密码:
rabbitmqctl  change_password  Username  Newpassword
  • 查看当前用户列表:
rabbitmqctl  list_users
  • 验证用户密码
    rabbitmqctl authenticate_user Username  Password
    

2. 角色管理

rabbitmq用户角色分为Administrator、Monitoring、Policymaker、Management、Impersonator、None共六种角色。

  1. Administrator
    超级管理员,可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作,因为是超级管理员,可以这样理解,它可以为所欲为,什么操作都能干,删除用户、修改用户密码、重置用户角色、策略制定等等。

  2. Monitoring
    监控者,可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)。management可以做的任何事外加:
    列出所有virtual hosts,包括他们不能登录的virtual hosts
    查看其他用户的connections和channels
    查看节点级别的数据如clustering和memory使用情况
    查看真正的关于所有virtual hosts的全局的统计信息

  3. Policymaker
    策略制定者,可登陆管理控制台(启用management plugin的情况下),同时可以对policy进行管理。但无法查看节点的相关信息。

management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters

  1. Management
    普通管理者,仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

    列出自己可以通过AMQP登入的virtual hosts
    查看自己的virtual hosts中的queues, exchanges 和 bindings
    查看和关闭自己的channels 和 connections
    查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。

  2. Impersonator
    模拟者,无法登录管理控制台,因为没有管理者权限.

  3. None
    其他用户,无法登陆管理控制台,通常就是普通的生产者和消费者。

  • 给用户设置标签 none management monitoring administrator 多个用,分隔
#rabbitmqctl set_user_tags {username} {tag ...}
rabbitmqctl set_user_tags admin administrator

3. 权限管理

用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限:

  • 配置权限会影响到exchange,queue的声明和删除。
  • 读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。
  • 设置用户权限:
    ConfP WriteP ReadP 部分是三个正则表达式,用来匹配要赋权的资源名称
rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP
# 例子:
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  • 查看(指定hostpath)所有用户的权限信息:
rabbitmqctl  list_permissions  [-p  VHostPath]
  • 查看指定用户的权限信息:
rabbitmqctl  list_user_permissions  User
  • 清除用户的权限信息:
rabbitmqctl  clear_permissions  [-p VHostPath]  User

五 、开发使用

发表评论

邮箱地址不会被公开。