威尼斯人线上娱乐

初识RabbitMQ类别之3

12 4月 , 2019  

网上参考大神们的博客,自身做了三个RabbitMQ即时发音信的德姆o。

话不多说,直接上代码!

注:这份文书档案是本身和多少个对象学习后1起形成的。

威尼斯人线上娱乐 1

 

一:搭建二个消除方案框架:RabbitMQ_Demo

目录

  • RabbitMQ 概念
  • exchange沟通机机制
    • 怎么着是交换机
    • binding?
    • Direct Exchange交换机
    • Topic Exchange交换机
    • Fanout Exchange交换机
    • Header Exchange交换机
  • RabbitMQ 的 Hello – Demo(springboot实现)
  • 初识RabbitMQ类别之3。RabbitMQ 的 Hello Demo(spring xml实现)
  • RabbitMQ 在生育条件下选拔和产出的标题
    • Spring RabbitMQ 注解
    • 消息的 JSON 传输
    • 新闻持久化,断线重连,ACK。

1.引言

RabbitMQ——Rabbit Message
Queue的简写,但不可能只是知道其为音信队列,音信代理更合适。RabbitMQ
是二个由 Erlang
语言开发的AMQP(高级新闻队列协议)的开源实现,其内部结构如下:

威尼斯人线上娱乐 2

RabbitMQ作为1个新闻代理,首要和消息社交,负责接收并转载音信。RabbitMQ提供了可相信的音讯机制、跟踪机制和灵活的新闻路由,协助新闻集群和分布式安排。适用于排队算法、秒杀活动、音信分发、异步处理、数据同步、处理耗费时间职务、CQRAV4S等接纳场景。

上边大家就来学学下RabbitMQ。

壹.选拔VS的NuGet安装包管理工科具安装RabbitMQ.Client:

在这之中含有陆个部分:

RabbitMQ 概念

RabbitMQ
即二个新闻队列,重大是用来贯彻应用程序的异步息争耦,同时也能起到音讯缓冲,音信分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。暗许运维端口
567二。

在 RabbitMQ 中,如下图结构:

威尼斯人线上娱乐 3

rabbitmq

  • 右侧 P 代表 生产者,也正是往 RabbitMQ 发音信的先后。
  • 中间正是 RabbitMQ,内部囊括了 沟通机 和 队列。
  • 出手 C 代表 消费者,也正是往 RabbitMQ 拿音信的主次。

那么,里面相比重大的定义有 6个,分别为:虚拟主机,沟通机,队列,和绑定。

  • 虚拟主机:3个虚拟主机持有壹组调换机、队列和绑定。为啥须求八个虚拟主机呢?很简短,RabbitMQ其中,用户只可以在虚拟主机的粒度进行权力决定。
    因而,借使要求禁止A组访问B组的交流机/队列/绑定,必须为A和B分别创造八个虚拟主机。每三个RabbitMQ服务器都有三个暗许的虚拟主机“/”。
  • 交换机:Exchange 用于转载新闻,但是它不会做存款和储蓄 ,要是没有Queue bind 到 Exchange 的话,它会一贯放弃掉 Producer
    发送过来的新闻。

    • 此地有三个比较重大的定义:***路由键 ***
      。新闻到交流机的时候,交互机会转载到相应的行列中,那么到底转载到哪些队列,就要遵照该路由键。
  • 绑定:也便是交流机供给和队列相绑定,那之中如上图所示,是多对多的涉及。

二. 条件搭建

本文首要依照Windows下使用Vs Code 基于.net
core进行demo演示。开首在此以前大家供给准备好以下条件。

  • 安装Erlang运维环境
    威尼斯人线上娱乐 ,下载安装Erlang。
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ。
  • 启动RabbitMQ Server
    点击Windows开端按钮,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理人身份运维。
  • 逐条执行以下命令运转RabbitMQ服务

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start
    
  • 执行rabbitmqlctl status检查RabbitMQ状态

  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management即可成功安装,使用暗许账号密码(guest/guest)登录即可。

威尼斯人线上娱乐 4

一:RabbitMQ 公用类库项目

exchange调换机机制

3. Hello RabbitMQ

在始发以前大家先来明白下音信模型:
威尼斯人线上娱乐 5
顾客(consumer)订阅有些队列。生产者(producer)创造消息,然后发表到行列(queue)中,队列再将音信发送到监听的主顾。

上边大家我们通过demo来精晓RabbitMQ的大旨用法。

 

二:三个劳动者控制台项目

怎么样是沟通机

rabbitmq的message
model实际上海消防息不直接发送到queue中,中间有二个exchange是做音信分发,producer甚至不明了新闻发送到那多少个队列中去。由此,当exchange收到message时,必须准确通晓该怎么分发。是append到自然规则的queue,依然append到四个queue中,如故被放弃?那个规则都是通过exchagne的4种type去定义的。

The core idea in the messaging model in RabbitMQ is that the producer
never sends any messages directly to a queue. Actually, quite often
the producer doesn’t even know if a message will be delivered to any
queue at all.

Instead, the producer can only send messages to an exchange. An
exchange is a very simple thing. On one side it receives messages from
producers and the other side it pushes them to queues. The exchange
must know exactly what to do with a message it receives. Should it be
appended to a particular queue? Should it be appended to many queues?
Or should it get discarded. The rules for that are defined by the
exchange type.

exchange是1个音讯的agent,每二个虚拟的host中都有定义。它的职责是把message路由到分裂的queue中。

叁.一.音信的发送和接到

创制RabbitMQ文件夹,打开命令提醒符,分别创立七个控制台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

大家先来添加音讯发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再来完善音讯接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运转音讯接收端,再运营新闻发送端,结果如下图。

威尼斯人线上娱乐 6

从地点的代码中得以观望,发送端和消费端的代码前4步都是同1的。首要的分别在于发送端调用channel.BasicPublish格局发送音讯;而接收端供给实例化一个EventingBasicConsumer实例来展开新闻处理逻辑。此外1些亟待小心的是:音信接收端和出殡和埋葬端的队列名称(queue)必须保持壹致,那里钦定的连串名字为hello。

二.劳动者端代码:

三:七个顾客控制台项目

binding?

exchange和queue通过routing-key关联,这两者之间的涉及是就是binding。如下图所示,X表示沟通机,黄绿代表队列,调换机通过贰个routing-key去binding一个queue,routing-key有如何效益呢?看Direct
exchange类型交流机。

威尼斯人线上娱乐 7

3.二. 循环往复调度

应用工作行列的益处便是它亦可互相的拍卖队列。如若堆积了不少职务,大家只需求加上更多的劳引力(workers)就能够了。大家先运转七个接收端,等待音讯接收,再起步3个殡葬端举办音讯发送。

威尼斯人线上娱乐 8

我们扩张运营八个消费端后的运行结果:

威尼斯人线上娱乐 9

从图中可见,我们循环境与发展送四条音讯,多少个消息接收端按梯次被循环分配。
默许情形下,RabbitMQ将按顺序将每条音讯发送给下二个买主。平均每种消费者将收获同等数量的音讯。那种分发音讯的法子叫做循环(round-robin)。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Producter
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName="localhost",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31         static void Main(string[] args)
32         {
33             DirectExchangeSendMsg();
34             Console.WriteLine("按任意键退出程序!");
35             Console.ReadKey();
36         }
37         /// <summary>
38         /// 单点精确路由模式
39         /// </summary>
40         private static void DirectExchangeSendMsg()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     var props = channel.CreateBasicProperties();
51                     props.Persistent = true;
52                     Console.WriteLine("请输入需要发送的消息:");
53                     string vadata = Console.ReadLine();
54                     while (vadata != "exit")
55                     {
56                         var msgBody = Encoding.UTF8.GetBytes(vadata);
57                         channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
58                         Console.WriteLine(string.Format("发送时间:{0},发送完毕,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
59                         vadata = Console.ReadLine();
60                     }
61                 }
62             }
63         }
64     }
65 }

项目结构如图:

Directed Exchange

路由键exchange,该调换机械收割到音讯后会把音讯发送到钦定routing-key的queue中。那新闻沟通机是怎么精通的吗?其实,producer
deliver音信的时候会把routing-key add到 message
header中。routing-key只是一个messgae的attribute。

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.

Default Exchange
那种是例外的Direct
Exchange,是rabbitmq内部暗中认可的一个沟通机。该调换机的name是空字符串,全体queue都默许binding
到该沟通机上。全数binding到该沟通机上的queue,routing-key都和queue的name1样。

叁.三. 信息确认

规行矩步大家地方的demo,1旦RabbitMQ将消息发送到消费端,消息就会立刻从内部存款和储蓄器中移出,无论消费端是还是不是处理完了。在那种场馆下,音讯就会丢掉。

为了保障三个音信永远不会丢掉,RabbitMQ匡助新闻确认(message
acknowledgments)
。当消费端接收新闻还要处理完了后,会发送贰个ack(新闻确认)非复信号到RabbitMQ,RabbitMQ接收到那么些实信号后,就足以去除掉那条已经处理的音信职务。但假使消费端挂掉了(比如,通道关闭、连接丢失等)未有发送ack复信号。RabbitMQ就会知道有个别音讯尚未例行处理,RabbitMQ将会再次将消息入队,假设有其它贰个费用端在线,就会飞速的重新发送到此外三个消费端。

RabbitMQ中平昔不音信超时的定义,唯有当消费端关闭或奔溃时,RabbitMQ才会重复分发音讯。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

器重改动的是将
autoAck:true修改为autoAck:fasle,以及在消息处理实现后手动调用BasicAck方式开始展览手动音讯确认。

威尼斯人线上娱乐 10

从图中可见,音讯发送端连接发送4条音讯,个中消费端一先被分配处理第三条音信,消费端二被循环分配第一条新闻,第2条新闻由于未有空余消费者依旧在队列中。
在消费端2未处理完第一条音讯从前,手动中断(ctrl+c)。大家能够窥见RabbitMQ在下叁次分发时,会先行将被搁浅的新闻分发给消费端一拍卖。

三.主顾端代码:

威尼斯人线上娱乐 11

Topic Exchange

通配符交流机,exchange会把消息发送到三个依然多少个满足通配符规则的routing-key的queue。其中*表号相称2个word,#匹配几个word和途径,路径之间通过.隔绝。如满意a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

三.4. 新闻持久化

音信确认确定保障了尽管消费端卓殊,信息也不会丢掉能够被重复分发处理。可是一旦RabbitMQ服务端极度,音讯依然会丢掉。除非我们钦定durable:true,否则当RabbitMQ退出或奔溃时,音讯将1如既往会丢掉。通过点名durable:true,并指定Persistent=true,来告诉RabbitMQ将新闻持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将音讯标记为持久性不能够完全保证音信不会丢掉。固然它报告RabbitMQ将新闻保存到磁盘,不过当RabbitMQ接受音讯还要还尚无保留时​​,还是有1个极短的岁月窗口。RabbitMQ
恐怕只是将新闻保存到了缓存中,并不曾将其写入到磁盘上。持久化是不能肯定保障的,不过对于3个简易职务队列来说早已丰富。假诺必要保障消息队列的持久化,可以运用publisher
confirms.

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Consumer
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName = "127.0.0.1",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31 
32         static void Main(string[] args)
33         {
34             DirectAcceptExchange();
35 
36             Console.WriteLine("输入任意值退出程序!");
37             Console.ReadKey();
38         }
39 
40         private static void DirectAcceptExchange()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     while (true)
51                     {
52                         BasicGetResult msgResponse = channel.BasicGet(QueueName, autoAck: false);
53                         if (msgResponse != null)
54                         {
55                             var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
56                             Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
57                         }
58                         //System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
59                     }
60                 }
61             }
62         }
63     }
64 }

 

Fanout Exchange

扇形调换机,该调换机会把音讯发送到全部binding到该沟通机上的queue。那种是publisher/subcribe情势。用来做广播最棒。
持有该exchagne上钦赐的routing-key都会被ignore掉。

The fanout copies and routes a received message to all queues that are
bound to it regardless of routing keys or pattern matching as with
direct and topic exchanges. Keys provided will simply be ignored.

三.伍. 公平分发

RabbitMQ的新闻分发暗中认可依据消费端的数量,按顺序循环分发。那样仅是保障了消费端被平均分发音讯的数码,但却忽略了消费端的闲忙意况。那就大概出现某些消费端直接处理耗费时间职责处于阻塞状态,某些消费端直接处理壹般任务处于空置状态,而只是它们分配的职责数量同样。

威尼斯人线上娱乐 12

但大家得以经过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来告诉RabbitMQ,在未收打消费端的音讯确认时,不再分发消息,也就保证了当消费端处于艰辛景色时,不再分配任务。

//Receive.cs
//4. 申明队列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

那时候你供给小心的是一旦具有的消费端都远在费力景色,你的行列大概会被塞满。你须求注意那点,要么添加更加多的消费端,要么选择任何策略。

肆.程序结果:

贰:开发以前,要求引用RabbitMQ包

Header Exchange

安装header attribute参数类型的交换机。

4. Exchange

细心的您大概发现上面的demo,生产者和顾客直接是透过一致队列名称进行相称衔接的。消费者订阅有些队列,生产者创造消息公布到行列中,队列再将音讯转发到订阅的主顾。那样就会有3个局限性,即消费者三回只好发送音信到某三个种类。

那消费者怎么着才能发送新闻到多个消息队列呢?
RabbitMQ提供了Exchange,它好像于路由器的职能,它用于对音信进行路由,将音讯发送到多个类别上。Exchange一方面从劳动者接收新闻,另一方面将音讯推送到行列。但exchange必须了然如何处理接收到的音讯,是将其附加到特定队列照旧增大到多少个类别,依然平素忽略。而那么些规则由exchange
type定义,exchange的规律如下图所示。
威尼斯人线上娱乐 13

普遍的exchange type 有以下二种:

  • direct(明显的路由规则:消费端绑定的队列名称必须和新闻发表时钦定的路由名称1致)
  • topic (形式相称的路由规则:帮助通配符)
  • fanout (音信广播,将音讯分发到exchange上绑定的富有队列上)

下边大家就来挨家挨户那介绍它们的用法。

威尼斯人线上娱乐 14

安装相应的Nuget包,恐怕下载相关dll也足以,可是建议在线安装nuget,更方便人民群众

RabbitMQ 的 Hello Demo

设置就隐瞒了,提出依照法定文书档案上做。先贴代码,稍后解释,代码如下:

布局 交流机,队列,交流机与队列的绑定,音信监视容器:

@Configuration
@Data
public class RabbitMQConfig {

    final static String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("spring-boot-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

配备接收消息者(即消费者):

public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

铺排发送新闻者(即生产者):

@RestController
public class Test {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET)
    public String test(@PathVariable(value = "abc") String abc){
        rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!");
        return  "abc";
    }
}

上述便可完毕1个简约的 RabbitMQ
德姆o,具体代码在:点这里

这就是说,这里,分为几个部分分析:发新闻,交换机队列,收音信。

  • 对于发送新闻:大家1般能够选取 RabbitTemplate,那么些是 Spring
    封装给了我们,便于我们发送消息,大家调用
    rabbitTemplate.convertAndSend("spring-boot", xxx); 即可发送消息。
  • 对此交流机队列:如上代码,我们供给配备调换机
    TopicExchange,配置队列 Queue,并且配备他们中间的绑定 Binding
  • 对此收受新闻:首先供给创制2个音信监听容器,然后把大家的接受者注册到该容器中,那样,队列中有音讯,那么就会调用接收者的应和的办法。如上代码
    container.setMessageListener(listenerAdapter);
    个中,MessageListenerAdapter 能够作为是
    大家接收者的一个包装类,new MessageListenerAdapter(receiver, "receiveMessage");
    指明了1旦有新闻来,那么调用接收者哪个方法进行处理。

4.1 fanout

本着绳趋尺步的构思,大家先来理解下fanout的广播路由体制。fanout的路由机制如下图,即发送到
fanout 类型exchange的新闻都会散发到独具绑定该exchange的行列上去。

威尼斯人线上娱乐 15

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型无需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

顾客示例代码:

//申明fanout类型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到指定fanout类型exchange,无需指定路由键
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

 

查找:RabbitMQ.Client  
安装新型版即可,不明了怎么设置nuget,请移步 

RabbitMQ 的 Hello Demo(spring xml实现)

spring
xml格局实现RabbitMQ简单,可读性较好,配置简单,配置和兑现如下所示。

上文已经讲述了rabbitmq的陈设,xml情势通过properites文件存放用户配置消息:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672

配置application-mq.xml配置文件,评释连接、调换机、queue以及consumer监听。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 -->
    <context:property-placeholder location="classpath:mq.properties" />
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- spring template声明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" />

    <!--申明queue-->
    <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
    <!--申明exchange交换机并绑定queue-->
    <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!--consumer配置监听-->
    <bean id="reveiver" class="com.demo.mq.receive.Reveiver" />
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/>
    </rabbit:listener-container>
</beans>

上述代码中,引进properties文件就不多说了。

<rabbit:connection-factory>标签申明成立connection的factory工厂。

<rabbit-template>声明spring
template,和上文spring中使用template一样。template可声明exchange。

<rabbit:queue>宣称1个queue并安装queue的布署项,直接看标签属性就足以领会queue的配置项。

<rabbit:direct-exchange>证明沟通机并绑定queue。

<rabbit:listener-container>表明监听container并安插consumer和监听routing-key。

剩余就总结了,application-context.xml中把rabbitmq配置import进去。

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

    <context:component-scan base-package="com.demo.**" />
    <import resource="application-mq.xml" />
</beans>

Producer达成,发送音信依旧采用template的convertAndSend() deliver消息。

@Service
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger logger = LoggerFactory.getLogger(Producer.class);

    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(queueKey, object);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("exeception={}",e);
        }

    }
}

配置consumer

package com.demo.mq.receive;

import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;

@Service
public class Reveiver {
    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("reveice msg=" + message.toString());
        latch.countDown();
    }
}

测试deliver消息

Controller
@RequestMapping("/demo/")
public class TestController {
    private final static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Resource
    private Producer producer;


    @RequestMapping("/test/{msg}")
    public String send(@PathVariable("msg") String msg){
        logger.info("#TestController.send#abc={msg}", msg);
        System.out.println("msg="+msg);
        producer.sendDataToQueue("test_queue_key",msg);
        return "index";
    }
}

4.2. direct

direct相对于fanout就属于完全合作、单播的方式,路由体制如下图,即队列名称和新闻发送时钦定的路由完全合营时,新闻才会发送到钦定队列上。
威尼斯人线上娱乐 16

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到direct类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

消费者示例代码:

//申明direct类型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//绑定队列到direct类型exchange,需指定路由键routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

注:在第一步事先,你必要安装RabbitMQ客户端,可从

设置好之后,开始激动的写代码啦!

RabbitMQ 在生产环境下行使和出现的难题

在生养条件中,由于 Spring 对 RabbitMQ
提供了部分方便人民群众的注明,所以首先能够采纳这几个注明。例如:

  • @EnableRabbit:@EnableRabbit 和 @Configuration
    评释在1个类中结合使用,如若此类能够回到二个RabbitListenerContainerFactory 类型的
    bean,那么就一定于能够把该终端(消费端)和 RabbitMQ
    实行连接。Ps:(生成端不是由此 RabbitListenerContainerFactory 来和
    RabbitMQ 连接,而是通过 RabbitTemplate )
  • @RabbitListener:当对应的种类中有消息的时候,该申明修饰下的主意会被执行。
  • @RabbitHandler:接收者能够监听几个体系,不一致的队列新闻的体系恐怕两样,该注明能够使得差别的新闻让不相同方法来响应。

具体那么些注明的利用,可以参见那里的代码:点这里

率先,生产条件下的 RabbitMQ
或然不会在劳动者大概消费者本机上,所以须要再行定义
ConnectionFactory,即:

@Bean
ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
    connectionFactory.setUsername(userName);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(vhost);
    return connectionFactory;
}

此地,能够重复安装需求连接的 RabbitMQ 的
ip,端口,虚拟主机,用户名,密码。

下一场,能够先从生产端记挂,生产端要求接二连三 RabbitMQ,那么能够通过
RabbitTemplate 举办连接。 Ps:(RabbitTemplate
用于生产端发送新闻到调换机中),如下代码:

@Bean(name="myTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(integrationEventMessageConverter());
    template.setExchange(exchangeName);
    return template;
}

在该代码中,new RabbitTemplate(connectionFactory);
设置了生产端连接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter());
设置了 生产端发送给调换机的音讯是以什么样格式的,在
integrationEventMessageConverter() 代码中:

public MessageConverter integrationEventMessageConverter() {
    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
    return messageConverter;
}

如上 Jackson2JsonMessageConverter 指明了 JSON。上述代码的尾声
template.setExchange(exchangeName); 指明了
要把劳动者要把新闻发送到哪个沟通机上。

有了上述,那么,咱们即可使用
rabbitTemplate.convertAndSend("spring-boot", xxx); 发送新闻,xxx
表示任意档次,因为上述的装置会帮我们把这个品种转化成 JSON 传输。

随后,生产端发送大家说过了,那么今后能够看看消费端:

对此消费端,我们得以只成立
SimpleRabbitListenerContainerFactory,它能够帮大家转变
RabbitListenerContainer,然后大家再使用 @RabbitListener
内定接收者收到音讯时处理的主意。

@Bean(name="myListenContainer")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(integrationEventMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    return factory;
}

这其中
factory.setMessageConverter(integrationEventMessageConverter());
内定了大家承受新闻的时候,以 JSON
传输的新闻可以转换到对应的品类传入到点子中。例如:

@Slf4j
@Component
@RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot")
public class Receiver {
    @RabbitHandler
    public void receiveTeacher(Teacher teacher) {
        log.info("##### = {}",teacher);
    }
}

或是出现的难题:

4.3. topic

topic是direct的升迁版,是一种形式相配的路由机制。它补助采纳三种通配符来进展方式相称:符号#和符号*。其中*万分贰个单词,
#则表示相称0个或多少个单词,单词之间用.分割。如下图所示。
威尼斯人线上娱乐 17

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

买主示例代码:

//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

        可是RabbitMQ又是依靠于Erlang
OTP平台,所以,安装RabbitMQ在此之前,必要先从

 

消息持久化

在生养条件中,大家必要考虑万一劳动者挂了,消费者挂了,或许 rabbitmq
挂了什么样。一般的话,假若劳动者挂了可能消费者挂了,其实是从未影响,因为音讯就在队列之中。那么万一rabbitmq
挂了,以前在队列之中的音讯如何是好,其实能够做消息持久化,RabbitMQ
会把音信保存在磁盘上。

做法是能够先从 Connection 对象中拿到两个 Channel
信道对象,然后再能够因此该目的设置 音信持久化。

5. RPC

SportagePC——Remote Procedure Call,远程进程调用。
这RabbitMQ怎样开始展览长距离调用呢?示意图如下:
威尼斯人线上娱乐 18
首先步,首假诺展开远程调用的客户端须求钦赐接收远程回调的队列,并注脚消费者监听此行列。
第一步,远程调用的服务端除了要表明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。

远程调用客户端:

 //申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

长距离调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

       
关于那有的的始末,推荐阅读:

三:完成RabbitMQ基本收发消息的意义

生产者只怕消费者断线重连

此间 Spring 有全自动重连机制。

6. 总结

依照上面包车型地铁demo和对三种差别exchange路由体制的上学,大家发现RabbitMQ首倘使涉嫌到以下多少个着力概念:

  1. Publisher:生产者,音讯的发送方。
  2. Connection:互联网连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:沟通器(路由器),负责新闻的路由到对应队列。
  5. Binding:队列与调换器间的涉及绑定。消费者将关爱的队列绑定到钦命沟通器上,以便Exchange能准确分发音讯到钦点队列。
  6. Queue:队列,音讯的缓冲存储区。
  7. Virtual
    Host:虚拟主机,虚拟主机提供能源的逻辑分组和分手。包罗连接,调换,队列,绑定,用户权限,策略等。
  8. Broker:信息队列的服务器实体。
  9. Consumer:消费者,音讯的接收方。

这次作为入门就讲到那里,下次我们来讲课下EventBus +
RabbitMQ
哪些达成事件的分发。

参考资料:
RabbitMQ Tutorials
Demo路径——RabbitMQ

 

1:在RabbitMQ_Lib类库中新建类:MyRabbitMQ.cs

ACK 确认机制

各种Consumer大概必要一段时间才能处理完收到的数额。假设在那几个进度中,Consumer出错了,非常退出了,而数据还从未处理完了,那么
分外不幸,这段数据就丢掉了。因为大家采取no-ack的艺术举行确认,也正是说,每趟Consumer接到数据后,而不论是是还是不是处理完结,RabbitMQ Server会立刻把那一个Message标记为实现,然后从queue中除去了。

借使1个Consumer非凡退出了,它处理的数码可见被别的的Consumer处理,这样数据在那种景观下就不会丢掉了(注意是那种景况下)。
为了保障数据不被遗失,RabbitMQ协理音信确认机制,即acknowledgments。为了保障数据能被正确处理而不只是被Consumer收到,那么大家无法运用no-ack。而应该是在处理完数据后发送ack。

在处理多少后发送的ack,就是告诉RabbitMQ数据已经被接受,处理完毕,RabbitMQ能够去安全的删除它了。
设若Consumer退出驾驭而从未发送ack,那么RabbitMQ就会把那个Message发送到下一个Consumer。这样就保险了在Consumer十分退出的场地下数据也不会丢掉。

  此Demo只是‘direct’格局的音讯发送接收方式。

威尼斯人线上娱乐 19威尼斯人线上娱乐 20

村办对 RabbitMQ ACK 的有的难题,求助:点这里

public class MyRabbitMQ
    {
        //连接工厂
        private ConnectionFactory factory { get; set; } = new ConnectionFactory
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        private IConnection connection { get; set; }

        private IModel channel { get; set; }

        //生产方 构造函数
        public MyRabbitMQ(string exchangeName = "",string exchangeType = ExchangeType.Direct)
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个转发器
            channel.ExchangeDeclare(exchangeName, exchangeType);
        }

        //消费方 构造函数
        public MyRabbitMQ(string exchangeName = "", string queueName = "", string routingKey = "")
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个队列
            channel.QueueDeclare(queueName, true, false, false);

            //队列绑定
            channel.QueueBind(queueName, exchangeName, routingKey);
        }

        public void SendMessage(string message="", string exchangeName = "", string routingKey = "")
        {
            channel.BasicPublish(exchangeName, routingKey, null, Encoding.UTF8.GetBytes(message));
        }

        public QueueingBasicConsumer ReceiveMessage(string queueName = "")
        {
            //EventingBasicConsumer
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, true, consumer);
            return consumer;
        }
    }

总结

  1. RabbitMQ 功效:异步,解耦,缓冲,新闻分发。
  2. RabbitMQ 主要分为二个部分,生产者,交流机和队列,消费者。
  3. 亟需专注消息持久化,指标为了防患 RabbitMQ 宕机;思念 ACK
    机制,目标为了要是顾客对消息的处理退步了,那么继续要怎么着处理。

View Code

写在结尾

  1. 写出来,说出去才知道对不对,知道不对才能修正,校对了才能成长。
  2. 在技能上面,希望大家眼里都容不得沙子。倘若有不规则的地点依然必要立异的地方希望能够提出,格外多谢。

可以看出,首先有七个构造函数,一个是给劳动者采纳,三个是给顾客接纳,注意参数有所区别,能够看来生产者与买主关切的点是不雷同的。

甭管生产或许消费,都以二个客户端,都急需创立贰个RabbitMQ连接并创立三个channel,才方可拓展连锁的操作,这一个操作都以由channel发起的,那样说应该相比白话了。

组织生产者的时候,首假若创制2个转载器,转载器的名字及项目要求定义,

转载器常用类型包蕴两种:direct、fanout、topic,

这几体系型那里说的更精晓:

本例子中是以topic为例子的

 

2:MQ_Producter项目中发送新闻(生产者中发送新闻)

威尼斯人线上娱乐 21威尼斯人线上娱乐 22

class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "07281616_exchange_topic";
            string routingkeya = "0728.a.c.routingkey";
            string routingkeyb = "0728.b.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, ExchangeType.Topic);
            for (int i = 0; i < 3600; i++)
            {
                System.Threading.Thread.Sleep(1000);
                if (i % 2 == 0)
                {
                    var message = $"{routingkeyb} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeyb}");
                    myMQ.SendMessage(message, exchangeName, routingkeyb);
                }
                else
                {
                    var message = $"{routingkeya} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeya}");
                    myMQ.SendMessage(message, exchangeName, routingkeya);
                }

            }
        }
    }

View Code

此处发送了3600次,每秒发一遍新闻,奇数和偶数发送的路由规则不平等,会有多少个例外的客户端来接收,那样方便大家测试音信是不是被分发到了差异的体系上

 

3:七个买主项目展开音讯的选择

买主1:

威尼斯人线上娱乐 23威尼斯人线上娱乐 24

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281616_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody}  (only for {routingRule})");
            }
        }
    }

View Code

 

买主贰:

威尼斯人线上娱乐 25威尼斯人线上娱乐 26

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281626_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.a.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody} (only for {routingRule})");
            }
        }
    }

View Code

 

四个顾客分别收受差别队列上的新闻

 

4:运行!

先编写翻译一下,到bin目录下先运维生产者,在运作多个买主

威尼斯人线上娱乐 27

 

也得以先关掉消费端,过7秒再关闭生产端,在web
管理界面可以看来今天有三个系列里有音讯,一个三条三个4条

威尼斯人线上娱乐 28

 

 四:总结

一体化例子的富有代码都在那里了,代码里相关心释也很理解,是本人本人达成的第3个RabbitMQ收发效率,实际选取中一定能够有成都百货上千扩充,新手们有思疑大概小编理解的有不规则的位置,烦请评论处提议哈,我们共同进步!


相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图