威尼斯人线上娱乐

【威尼斯人线上娱乐】新闻队列

15 4月 , 2019  

RabbitMQ队列

先是大家在讲rabbitMQ在此以前我们要说一下python里的queue:2者干的政工是千篇一律的,都以队列,用于传递消息

在python的queue中有多个一个是线程queue,2个是经过queue(multiprocessing中的queue)。线程queue无法跨进程,用于两个线程之间进行数量同步交互;进度queue只是用来父进度与子进度,可能同属于同意父进度下的三个子进程实行相互。也正是说假设是四个精光独立的先后,固然是python程序,也照样不可能用那几个历程queue来通信。这即便大家有五个独立的python程序,分属于多少个进度,恐怕是python和其它语言

安装:windows下

首先需求设置 Erlang环境

官网: 

Windows版下载地址:

Linux版:     使用yum安装

 

然后安装RabbitMQ了 

第3下载RabbitMQ 的Windows版本

下载地址:

安装pika:

事先安装过了pip,直接打开cmd,运维pip install pika

安装收尾之后,实现二个最简便的行列通信:

威尼斯人线上娱乐 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先成立四当中央的socket,然后建立贰个管道,在管道中发音信,然后声美赞臣个queue,起个系列的名字,之后真正的发新闻(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运转就直接运转下去,他持续收一条,永远在此间卡住。

在上头不管是produce依然consume,里面都宣示了二个queue,这些是为啥吗?因为我们不知晓是主顾先初始运维照旧生产者先运营,那样1旦未有注脚的话就会报错。

上面大家来看一下一对多,即二个劳动者对应四个买主:

第2大家运转一个买主,然后不断的用produce去发送数据,大家可以看出顾客是通过1种轮询的法门进行不断的接受多少,每一个顾客消费三个。

这正是说只要大家顾客接受了音讯,然后处理那一个新闻须求30分钟,在处理的进程中,消费者断电了宕机了,那消费者还未有处理完,我们设那几个职务我们不可能不处理完,这大家相应有1个肯定的音信,说这些职责到位了可能是从未有过到位,所以自个儿的劳动者要确认消费者是还是不是把这一个任务处理完了,消费者处理完事后要给这么些生产者服务器端发送三个肯定音信,生产者才会把那个任务从音信队列中删去。如若没有处理完,消费者宕机了,未有给劳动者发送确认音信,那就代表从未处理完,那我们看看rabbitMQ是怎么处理的

咱们得以在消费者的callback中添加两个time.sleep()实行效仿宕机。callback是一个回调函数,只要事件一触发就会调用这一个函数。函数执行完了就表示音讯处理完了,假若函数未有处理完,那就印证。。。。

大家能够看到在顾客代码中的basic_consume()中有贰个参数叫no_ack=True,那个意思是那条音信是不是被拍卖完都不会发送确认音讯,壹般大家不加这些参数,rabbitMQ暗中认可就会给你设置成新闻处理完了就活动发送确认,大家前日把那么些参数去掉,并且在callback中添加一句话运维:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

威尼斯人线上娱乐 2威尼斯人线上娱乐 3威尼斯人线上娱乐 4

运作的结果正是,笔者先运转一遍生产者,数据被消费者1吸收到了,不过自身把顾客1宕机,结束运作,那么消费者二就吸收接纳了消息,即要是消费者未有发送确认新闻,生产者就不会把消息删除。

RabbitMQ音讯持久化:

咱俩能够转变好多的音信队列,那大家怎么查看新闻队列的气象呢:rabbitmqctl.bat
list_queues

威尼斯人线上娱乐 5

近期的意况是,新闻队列中还有消息,不过服务器宕机了,那这几个新闻就丢了,那自个儿就须要以此音讯强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每一趟注脚队列的时候增进二个durable参数(客户端和劳务器端都要添加那么些参数),

威尼斯人线上娱乐 6

在这几个状态下,我们把rabbitMQ服务珍视启,发现唯有队列名留下了,但是队列中的音讯并没有了,那样我们还要求在劳动者basic_publish中添加贰个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

诸如此类就能够使得新闻持久化

今昔是贰个劳动者对应八个顾客,很公正的收发收发,然而事实上的图景是,大家机器的安插是不等同的,有的配置是单核的一部分配置是多核的,恐怕i柒处理器处理四条音信的时候和其他的微型计算机处理一条消息的光阴大致,那差的微处理器那里就会堆积音信,而好的处理器那里就会形成闲置,在现实中做运转的,大家会在负载均衡中设置权重,什么人的安顿高权重高,职务就多一些,可是在rabbitMQ中,大家只做了2个简练的拍卖就能够达成公平的信息分发,你有多大的力量就处理多少音讯

即:server端给客户端发送音信的时候,先检查未来还有多少音讯,假设当前音信尚未处理完结,就不会发送给这么些消费者音讯。假诺当前的买主绝非消息就发送

其贰头须要在顾客端进行修改加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 我们在变更3个consume二,在callback中sleep20秒来模拟

威尼斯人线上娱乐 7威尼斯人线上娱乐 8威尼斯人线上娱乐 9

本人先运转四个produce,被consume接受,然后在起步一个,就被consumer2接受,可是因为consumer第22中学sleep20秒,处理慢,所以那时在开发银行produce,就又给了consume实行拍卖

 

python学习之RabbitMQ—–音信队列,

RabbitMQ队列

前言:这一次整治写壹篇有关rabbitMQ的博客,比较上1篇redis,感觉rabbitMQ难度是增强不少。那篇博客会插入1些英文讲解,不过简单掌握的。rabbitMQ的下载与安装,请参见redis&rabbitMQ安装。

Publish\Subscrible(音信发布\订阅)

前边都以一对1的发送接收数据,那笔者想一对多,想广播壹样,生产者发送贰个新闻,全数顾客都收下音讯。那大家如何是好啊?这年我们即将用到exchange了

exchange在一端收音信,在另一端就把消息放进queue,exchange必须精确的明亮收到的音信要干什么,是不是合宜发到3个一定的queue依旧发给许多queue,恐怕说把她丢掉,那么些都被exchange的档次所定义

exchange在概念的时候是有项目的,以决定到底是那多少个queue符合条件,能够承受新闻:

fanout:全数bind到此exchange的queue都能够承受新闻

direct:通过rounroutingKey和exchange决定的非凡唯壹的queue还可以音讯

topic:全体符合routingKey的routingKey所bind的queue尚可音信

headers:通过headers来支配把音信发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此处的exchange从前是空的,未来赋值log;在那里也从不表明queue,广播不须求写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里大家有定义了八个queue,注意一下表明中的内容。可是我们在发送端未有注脚queue,为啥发送端不供给接收端供给呢?在consume里有3个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还须求三个queue_【威尼斯人线上娱乐】新闻队列。name

运作结果:

威尼斯人线上娱乐 10威尼斯人线上娱乐 11威尼斯人线上娱乐 12威尼斯人线上娱乐 13

就一定于收音机一样,实时播报,打开四个顾客,生产者发送一条数据,然后二个买主同时接受到

RabbitMQ队列

首先大家在讲rabbitMQ在此以前大家要说一下python里的queue:贰者干的事情是同壹的,都以队列,用于传递新闻

在python的queue中有四个二个是线程queue,三个是进度queue(multiprocessing中的queue)。线程queue无法跨进度,用于多个线程之间展开数据同步交互;进程queue只是用于父进度与子过程,大概同属于同意父进度下的多少个子进度实行互动。也便是说如若是八个完全独立的顺序,尽管是python程序,也依然不可见用这一个进程queue来通讯。那倘若大家有五个单身的python程序,分属于五个进程,可能是python和别的语言

安装:windows下

第二供给安装 Erlang环境 官网: 
Windows版下载地址:
Linux版:     使用yum安装   然后安装RabbitMQ了  首先下载RabbitMQ
的Windows版本 下载地址:

安装pika:

事先安装过了pip,直接打开cmd,运营pip install pika

设置收尾之后,完成1个最简便的队列通讯:

威尼斯人线上娱乐 14

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创制2当中坚的socket,然后建立三个管道,在管道中发音信,然后声美素佳儿(Friso)个queue,起个类别的名字,之后真正的发音信(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要壹运行就径直运维下去,他不止收一条,永远在此处卡住。

在地方不管是produce还是consume,里面都声称了一个queue,这几个是怎么呢?因为大家不知情是消费者先起先运维照旧生产者先运转,那样只要没有申明的话就会报错。

上面大家来看一下一对多,即二个劳动者对应多个买主:

第三大家运营一个顾客,然后不断的用produce去发送数据,大家得以看看顾客是通过1种轮询的措施实行持续的收受多少,每一个消费者消费1个。

那正是说只要我们顾客接受了音信,然后处理那么些信息须要30分钟,在处理的进度中,消费者断电了宕机了,那消费者还未有拍卖完,大家设那个职分大家务必处理完,那大家应有有三个承认的音讯,说那些职责完结了仍旧是从未完毕,所以本人的劳动者要确认消费者是或不是把这一个职分处理完了,消费者处理完今后要给那几个生产者服务器端发送多少个认可音讯,生产者才会把这么些义务从新闻队列中删除。假如未有处理完,消费者宕机了,未有给劳动者发送确认音信,那就代表从未拍卖完,那我们看看rabbitMQ是怎么处理的

大家得以在消费者的callback中添加1个time.sleep()进行效仿宕机。callback是3个回调函数,只要事件一触发就会调用那么些函数。函数执行完了就表示音信处理完了,假设函数未有拍卖完,这就表明。。。。

咱俩可以观察在顾客代码中的basic_consume()中有三个参数叫no_ack=True,那个意思是这条音讯是或不是被拍卖完都不会发送确认音信,1般大家不加这一个参数,rabbitMQ私下认可就会给您设置成音讯处理完了就活动发送确认,大家今天把这一个参数去掉,并且在callback中添加一句话运维:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

威尼斯人线上娱乐 15威尼斯人线上娱乐 16威尼斯人线上娱乐 17

运营的结果正是,笔者先运转三遍生产者,数据被消费者一吸收到了,可是作者把顾客1宕机,甘休运转,那么消费者二就接收了新闻,即倘若消费者未有发送确认音信,生产者就不会把音讯删除。

RabbitMQ消息持久化:

大家可以转移好多的音讯队列,那我们怎么查看音讯队列的景况吧:rabbitmqctl.bat
list_queues

威尼斯人线上娱乐 18

前日的情形是,新闻队列中还有音信,可是服务器宕机了,那那么些新闻就丢了,那我就必要以此消息强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每一次注明队列的时候添加二个durable参数(客户端和劳动器端都要增加这么些参数),

威尼斯人线上娱乐 19

在这些景况下,我们把rabbitMQ服务注重启,发现唯有队列名留下了,但是队列中的音信未有了,那样我们还须求在劳动者basic_publish中添加3个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

这么就足以使得新闻持久化

于今是贰个劳动者对应多少个顾客,很公正的收发收发,不过实际上的情事是,大家机器的安插是不一致的,有的配置是单核的片段配置是多核的,可能i七处理器处理四条新闻的时候和别的的微处理器处理一条信息的年华东军大多,那差的处理器那里就会积聚音信,而好的总计机那里就会形成闲置,在切实可行中做运营的,大家会在负载均衡中安装权重,何人的布置高权重高,任务就多一些,可是在rabbitMQ中,大家只做了3个简易的拍卖就能够达成公平的音信分发,你有多大的力量就处理多少讯息

即:server端给客户端发送音信的时候,先检查未来还有多少音讯,固然当前消息并未有处理实现,就不会发送给这一个消费者消息。若是当前的消费者绝非新闻就发送

其一头要求在顾客端举办修改加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在转变多个consume贰,在callback中sleep20秒来模拟

威尼斯人线上娱乐 20威尼斯人线上娱乐 21威尼斯人线上娱乐 22

本身先运行两个produce,被consume接受,然后在开行三个,就被consumer2接受,不过因为consumer第22中学sleep20秒,处理慢,所以那时候在起步produce,就又给了consume实行拍卖

 

rabbitMQ是音讯队列;想想从前的我们学过队列queue:threading
queue(线程queue,多少个线程之间进行数量交互)、进度queue(父进度与子进度展开交互或许同属于同1父进度下的多少个子进度展开互动);假若七个独立的次序,那么之间是无法透过queue举办交互的,那时候我们就必要壹当中级代理即rabbitMQ

rabbitMQ是音信队列;想想以前的大家学过队列queue:threading
queue(线程queue,八个线程之间举办数量交互)、进度Queue(父进程与子进度展开交互或然同属于同1父进度下的四个子进度展开互动);借使八个独立的次序,那么之间是无法透过queue进行交互的,那时候我们就要求1个中级代理即rabbitMQ.

有选拔的收受音讯(exchange_type = direct)

RabbitMQ还帮忙依照重点字发送,即:队列绑定关键字,发送者将数据遵照首要字发送到音信exchange,exchange根据重大字判定应该将数据发送到钦赐的系列

威尼斯人线上娱乐 23

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

更是密切的过滤(exchange_type=topic)

威尼斯人线上娱乐 24

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

如上都是服务器端发音信,客户端收新闻,音讯流是单向的,那即便大家想要发一条命令给长途的客户端去执行,然后想让客户端执行的结果再次回到,则那种方式叫做rpc

RabbitMQ RPC

威尼斯人线上娱乐 25

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身三个打断形式,未有音信就等候音讯,有音信就收过来

self.connection.process_data_events()是叁个非阻塞版的start_consuming,正是说发了多个事物给客户端,每过一点时间去检查有未有新闻,即便未有新闻,能够去干其他政工

reply_to = self.callback_queue是用来选择反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第二在客户端会通过uuid四生成,第1在劳务器端重临执行结果的时候也会传过来1个,所以说如若服务器端发过来的correlation_id与团结的id相同
,那么服务器端发出来的结果就必然是本人正好客户端发过去的下令的实行结果。以后就三个劳动器端三个客户端,无所谓缺人不认可。未来客户端是非阻塞版的,大家得以不让它打字与印刷未有消息,而是举办新的通令,那样就两条音讯,不必然按梯次实现,那大家就需求去确认各样再次回到的结果是哪些命令的实践结果。

总体的情势是这么的:生产者发了二个限令给消费者,不知情客户端几时回来,依旧要去收结果的,不过它又不想进去阻塞形式,想每过1段时间看那一个音信收回来未有,就算新闻收回来了,就象征收完了。 

运行结果:

威尼斯人线上娱乐 26威尼斯人线上娱乐 27

劳务器端开启,然后在开行客户端,客户端先是等待音讯的出殡和埋葬,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

Publish\Subscrible(音讯公布\订阅)

眼下都是一对一的出殡和埋葬接收数据,那自个儿想一对多,想广播壹样,生产者发送二个音信,全部顾客都接受音讯。那大家如何做呢?这年我们就要用到exchange了

exchange在1端收新闻,在另壹端就把新闻放进queue,exchange必须可信的知情收到的消息要干什么,是不是应该发到四个一定的queue如故发给许多queue,恐怕说把她放任,那一个都被exchange的门类所定义

exchange在概念的时候是有档次的,以决定到底是那一个queue符合条件,能够承受音讯:

fanout:全数bind到此exchange的queue都足以接受音讯

direct:通过rounroutingKey和exchange决定的13分唯1的queue能够接到音讯

topic:全体符合routingKey的routingKey所bind的queue能够承受音讯

headers:通过headers来支配把音信发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此间的exchange从前是空的,现在赋值log;在此处也从未申明queue,广播不须要写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在消费者那里大家有定义了1个queue,注意一下诠释中的内容。可是大家在发送端没有证明queue,为何发送端不须求接收端要求吗?在consume里有1个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还索要3个queue_name

运作结果:

威尼斯人线上娱乐 28威尼斯人线上娱乐 29威尼斯人线上娱乐 30威尼斯人线上娱乐 31

就一定于收音机1样,实时播报,打开多个顾客,生产者发送一条数据,然后一个买主同时收取到

音讯队列:

 

有取舍的收受音信(exchange_type = direct)

RabbitMQ还辅助依照重点字发送,即:队列绑定关键字,发送者将数据依照重大字发送到新闻exchange,exchange依据重大字判定应该将数据发送到钦定的类别

威尼斯人线上娱乐 32

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

一发细心的过滤(exchange_威尼斯人线上娱乐 ,type=topic)

威尼斯人线上娱乐 33

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

上述都以服务器端发新闻,客户端收音讯,消息流是单向的,那假若大家想要发一条命令给长途的客户端去实践,然后想让客户端执行的结果再次来到,则那种方式叫做rpc

RabbitMQ RPC

威尼斯人线上娱乐 34

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身1个围堵格局,未有信息就等候音讯,有消息就收过来

self.connection.process_data_events()是2个非阻塞版的start_consuming,正是说发了一个事物给客户端,每过一点时刻去反省有未有音信,要是未有音讯,能够去干别的政工

reply_to = self.callback_queue是用来接纳反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第3在客户端会通过uuid四生成,第一在劳务器端重回执行结果的时候也会传过来叁个,所以说借使服务器端发过来的correlation_id与团结的id相同
,那么服务器端发出来的结果就肯定是作者正要客户端发过去的下令的推行结果。未来就八个劳动器端1个客户端,无所谓缺人不确认。未来客户端是非阻塞版的,大家可以不让它打字与印刷未有音信,而是实行新的授命,那样就两条音信,不自然按顺序达成,这大家就需求去肯定各类再次回到的结果是哪些命令的实行结果。

完整的情势是那般的:生产者发了3个命令给顾客,不掌握客户端何时回来,依然要去收结果的,可是它又不想进入阻塞格局,想每过一段时间看那么些消息收回来未有,假若音讯收回来了,就表示收完了。 

运营结果:

威尼斯人线上娱乐 35威尼斯人线上娱乐 36

劳动器端开启,然后在起步客户端,客户端先是等待音信的发送,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

RabbitMQ队列
首先大家在讲rabbitMQ从前大家要说一下python里的queue:2者干的作业是一律的,都以队列,用于…

  • RabbitMQ
  • ZeroMQ
  • ActiveMQ
  • ………..

一、不难的rabbitMQ队列通讯

威尼斯人线上娱乐 37

由上海体育场合能够,数据是首发给exchange沟通器,exchage再发放相应队列。pika模块是python对rabbitMQ的API接口。接收端有一个回调函数,1接收到多少就调用该函数。一条新闻被贰个消费者收到后,该信息就从队列删除。OK,精通下面的知识后,先来看望叁个粗略的rabbitMQ列队通讯。

send端:

 1 import pika
 2 #连上rabbitMQ
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()       #生成管道,在管道里跑不同的队列
 5 
 6 #声明queue
 7 channel.queue_declare(queue='hello1')
 8 
 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
10 #向队列里发数据
11 channel.basic_publish(exchange='',      #先把数据发给exchange交换器,exchage再发给相应队列
12                       routing_key='hello1', #向"hello'队列发数据
13                       body='HelloWorld!!')  #发的消息
14 print("[x]Sent'HelloWorld!'")
15 connection.close()

receive端:

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()
 5 
 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 9 # practice to repeat declaring the queue in both programs.
10 channel.queue_declare(queue='hello1')#声明队列,保证程序不出错
11 
12 
13 def callback(ch,method,properties,body):
14     print("-->ch",ch)
15     print("-->method",method)
16     print("-->properties",properties)
17     print("[x] Received %r" % body)         #一条消息被一个消费者接收后,该消息就从队列删除
18 
19 
20 channel.basic_consume(callback,              #回调函数,一接收到消息就调用回调函数
21                       queue='hello1',
22                       no_ack=False)    #消费完毕后向服务端发送一个确认,默认为False
23 
24 print('[*] Waiting for messages.To exit press CTRL+C')
25 channel.start_consuming()

运作结果:(下边包车型客车代码对应自笔者写的笺注相信是看得懂的~)

威尼斯人线上娱乐 38威尼斯人线上娱乐 39

rabbitMQ_1_send.py
 [x] Sent 'Hello World!'


rabbitMQ_2_receive.py
 [*] Waiting for messages. To exit press CTRL+C
-->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8>
-->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])>
-->properties <BasicProperties>
 [x] Received b'Hello World!!'

View Code

经过深远的测试,有以下五个意识:

  1. 先运行rabbitMQ_1_send.py发送数据,rabbitMQ_2_receive.py未运维。发现当receive运营时还可以接收数据。
  2. 运行多个(eg:三个)接收数据的客户端,再运维发送端,客户端1收受数额,再运转载送端,客户端二收到多少,再运维发送端,客户端3吸收数量。

RabbitMQ会暗中同意把p发的新闻依次分发给各样消费者(c),跟负载均衡差不离。

 

原理:

二、全英文ack

在看上边的事例,你会发现有一句代码no_ack=False(消费实现后向服务端发送一个承认,暗中同意为False),以自个儿爱沙尼亚语四级飘过的水准,看完上边关于ack的上课感觉写得很牛啊!!于是分享一下:

Doing a task can take a few seconds. You
may wonder what happens if one of the consumers starts a long task and
dies with it only partly done. With our current code once RabbitMQ
delivers message to the customer it immediately removes it from memory.
In this case, if you kill a worker we will lose the message it was just
processing. We’ll also lose all the messages that were dispatched to
this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a
worker dies, we’d like the task to be delivered to another
worker.

In order to make sure a message is never
lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is
sent back from the consumer to tell RabbitMQ that a particular message
had been received, processed and that RabbitMQ is free to delete
it.

If a consumer dies (its channel is
closed, connection is closed, or TCP connection is lost) without sending
an ack, RabbitMQ will understand that a message wasn’t processed fully
and will re-queue it. If there are other consumers online at the same
time, it will then quickly redeliver it to another consumer. That way
you can be sure that no message is lost, even if the workers
occasionally die.

There aren’t any message timeouts;
RabbitMQ will redeliver the message when the consumer dies. It’s fine
even if processing a message takes a very, very long time.

Message
acknowledgments are turned on by default. In previous examples we
explicitly turned them off via the no_ack=True flag. It’s time to
remove this flag and send a proper acknowledgment from the worker, once
we’re done with a task.

Using this code we can be sure that even
if you kill a worker using CTRL+C while it was processing a message,
nothing will be lost. Soon after the worker dies all unacknowledged
messages will be redelivered.

本人把发送端和接收端分别比作生产者与买主。生产者发送职责A,消费者接受职分A并拍卖,处理完后生产者将信息队列中的任务A删除。现在大家相见了四个题材:要是顾客收到职务A,但在拍卖的经过中赫然宕机了。而那时候生产者将消息队列中的职责A删除。实际上任务A并没有得逞拍卖完,约等于丢失了职分/新闻。为解决那一个题材,应使消费者收到职分并成功拍卖完后发送二个ack到生产者!生产者收到ack后就清楚义务A已被成功拍卖,那时才从消息队列少将职分A删除,假使没有接过ack,就需求把任务A发送给下多个顾客,直到职分A被成功拍卖。

 

威尼斯人线上娱乐 40

三、音讯持久化

日前已经通晓,生产者生产数据,消费者再起步是能够接收数据的。

可是,生产者生产数据,然后重启rabbitMQ,消费者是无法接收数据。

eg:音信在传输进程中rabbitMQ服务器宕机了,会发现前边的音讯队列就不存在了,这时大家即将用到音讯持久化,音信持久化会让队列不趁早服务器宕机而未有,会永远的保存下来。上边看下关于消息持久化的英文讲解:

We have learned how to make sure that
even if the consumer dies, the task isn’t lost(by default, if wanna
disable  use no_ack=True). But our tasks will still be lost if RabbitMQ
server stops.

When RabbitMQ quits or crashes it will forget the
queues and messages unless you tell it not to. Two things are
required to make sure that messages aren’t lost: we need to mark both
the queue and messages as durable.

First, we
need to make sure that RabbitMQ will never lose our queue. In order to
do so, we need to declare it as durable:

      1 channel.queue_declare(queue=’hello’,
durable=True)

Although this command is correct by
itself, it won’t work in our setup. That’s because we’ve already defined
a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an
existing queue with different parameters and will return an
error(会曝错) to any program that tries to do that. But there is
a quick workaround – let’s declare a queue with different name, for
exampletask_queue:

      1
channel.queue_declare(queue=’task_queue’, durable=True)

This queue_declare change needs to be
applied to both the producer and consumer code.

At that point we’re sure that
the task_queue queue won’t be lost even if RabbitMQ restarts. Now we
need to mark our messages as persistent –
by supplying a delivery_mode property with a value 2.

      1
channel.basic_publish(exchange=”,
      2
                      routing_key=”task_queue”,
      3
                      body=message,
      4
                      properties=pika.BasicProperties(
      5
                         delivery_mode = 2,      # make message
persistent
      6
                      ))

上面包车型大巴英文对音信持久化讲得很好。音讯持久化分为两步:

  • 持久化队列。通过代码达成持久化hello队列:channel.queue_declare(queue=’hello’,
    durable=True)
  • 持久化队列中的音信。通过代码达成:properties=pika.BasicProperties( delivery_mode = 2, )

那边有个点要留意下:

倘若您在代码中已兑现持久化hello队列与队列中的音讯。那么你重启rabbitMQ后再次运维代码大概会爆错!

因为: RabbitMQ doesn’t allow you to
redefine an existing queue with different parameters and will return an
error.

为了缓解那个题材,能够声贝拉米个与重启rabbitMQ此前差别的队列名(queue_name).

 

一、安装和中央采纳

四、音信公平分发

假定Rabbit只管按梯次把新闻发到各样消费者身上,不思虑消费者负载的话,很只怕出现,2个机器配置不高的消费者那里堆积了成都百货上千音讯处理不完,同时安插高的顾客却间接很自在。为焚薮而田此难题,能够在挨家挨户消费者端,配置perfetch=一,意思正是告诉RabbitMQ在小编这些消费者当前音信还没处理完的时候就毫无再给作者发新音信了。

威尼斯人线上娱乐 41

 

带音信持久化+公平分发的欧洲经济共同体代码

劳动者端:

威尼斯人线上娱乐 42威尼斯人线上娱乐 43

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)  #队列持久化
 9  
10 message = ' '.join(sys.argv[1:]) or"Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent消息持久化
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

View Code

买主端:

威尼斯人线上娱乐 44威尼斯人线上娱乐 45

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4  
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.queue_declare(queue='task_queue', durable=True)
10 print(' [*] Waiting for messages. To exit press CTRL+C')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     time.sleep(body.count(b'.'))
15     print(" [x] Done")
16     ch.basic_ack(delivery_tag =method.delivery_tag)   
17  
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20                       queue='task_queue')
21  
22 channel.start_consuming()

View Code

自身在运维方面程序时对顾客端里回调函数的一句代码(ch.basic_ack(delivery_tag
=method.delivery_tag))十分狐疑。那句代码去掉消费者端也能壹如既往收到新闻啊。那句代码有毛线用处??

生产者端音讯持久后,须求在消费者端加上(ch.basic_ack(delivery_tag
=method.delivery_tag)): 保障音信被消费后,消费端发送2个ack,然后服务端从队列删除该音讯.

 

安装RabbitMQ服务
 

伍、音讯发布与订阅

事先的例子都基本都以1对一的信息发送和接收,即音讯只好发送到内定的queue里,但多少时候你想让你的音讯被抱有的queue收到,类似广播的作用,那时候就要用到exchange了。PS:有趣味的刺探redis的发布与订阅,能够看看自家写的博客python之redis。

An exchange is a very simple thing. On
one side it receives messages from producers and the other side it
pushes them to queues. The exchange must know exactly what to do with a
message it receives. Should it be appended to a particular queue? Should
it be appended to many queues? Or should it get discarded(丢弃). The
rules for that are defined by the exchange type.

Exchange在概念的时候是有档次的,以控制到底是何等Queue符合条件,能够吸收接纳音信

 

fanout: 全体bind到此exchange的queue都足以接过新闻

direct: 通过routingKey和exchange决定的尤其唯壹的queue能够吸收接纳音信

topic:全部符合routingKey(此时能够是1个表达式)的routingKey所bind的queue能够接到音讯

 

表明式符号表达: #代表1个或三个字符,*表示任何字符
     
    例:#.a会匹配a.a,aa.a,aaa.a等
               
*.a会匹配a.a,b.a,c.a等
          
 注:使用RoutingKey为#,Exchange
Type为topic的时候一定于选用fanout

 

下边作者分别讲下fanout,direct,topic:

1、fanout

fanout: 全部bind到此exchange的queue都足以选取音讯

威尼斯人线上娱乐 46

send端:

威尼斯人线上娱乐 47威尼斯人线上娱乐 48

 1 import pika
 2 import sys
 3 
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel=connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                       type='fanout')
 9 
10 message=''.join(sys.argv[1:])or"info:HelloWorld!"
11 channel.basic_publish(exchange='logs',
12                       routing_key='',  #fanout的话为空(默认)
13                       body=message)
14 print("[x]Sent%r"%message)
15 connection.close()

View Code

receive端:

威尼斯人线上娱乐 49威尼斯人线上娱乐 50

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel=connection.channel()
 5 
 6 channel.exchange_declare(exchange='logs',type='fanout')
 7 
 8 #不指定queue名字(为了收广播),rabbit会随机分配一个queue名字,
 9 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 result=channel.queue_declare(exclusive=True)
11 queue_name=result.method.queue
12 
13 #把声明的queue绑定到交换器exchange上
14 channel.queue_bind(exchange='logs',
15                 queue=queue_name)
16 
17 print('[*]Waitingforlogs.ToexitpressCTRL+C')
18 
19 def callback(ch,method,properties,body):
20     print("[x]%r"%body)
21 
22 
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 
27 channel.start_consuming()

View Code

有四个点要小心下:

  • fanout-广播,send端的routing_key=”, #fanout的话为空(暗中同意)

  • receive端有一句代码:result=channel.queue_declare(exclusive=True),成效:不钦定queue名字(为了收广播),rabbitMQ会随机分配一个queue名字,exclusive=True会在采用此queue的消费者断开后,自动将queue删除。

 

贰、有选拔的吸收音信(exchange
type=direct)

RabbitMQ还辅助根据重点字发送,即:队列绑定关键字,发送者将数据依据主要字发送到音讯exchange,exchange依据 关键字
判定应该将数据发送至钦命队列。

威尼斯人线上娱乐 51

send端:

威尼斯人线上娱乐 52威尼斯人线上娱乐 53

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localh'))ost
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity, #关键字不为空,告知消息发送到哪里(info,error~)
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

View Code

receive端:

威尼斯人线上娱乐 54威尼斯人线上娱乐 55

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result =channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" %(method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

实际最发轫自笔者看代码是1脸懵逼的~
上面是本身在cmd举行测试的截图(合营着截图看会简单通晓些),贰个send端,五个receive端(先起receive端,再起receive端):

send端:

威尼斯人线上娱乐 56

receive端-1:

威尼斯人线上娱乐 57

receive端-2:

威尼斯人线上娱乐 58

 

3、更密切的消息过滤topic(供参考)

Although using the direct exchange
improved our system, it still has limitations – it can’t do routing
based on multiple criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix tool, which routes logs based on both severity
(info/warn/crit…) and facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’.

感觉到笔者英文水准不高啊~,我相比较着垃圾有道翻译,加上自身的敞亮,大致知道地点在讲哪些。

举例:
假诺是系统的错误,就把音信发送到A,假如是MySQL的失实,就把新闻发送到B。可是对B来说,想达成接收MySQL的错误音信,能够用有取舍的收纳音讯(exchange type=direct),让首要字为error就贯彻了哟!现在B有个要求:不是持有的错误新闻都接到,只收取内定的不当。在某种音讯再举行过滤,那正是更加细致的音讯过滤topic。

 

send端:

威尼斯人线上娱乐 59威尼斯人线上娱乐 60

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')  #类型为topic
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

View Code

receive端:

威尼斯人线上娱乐 61威尼斯人线上娱乐 62

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

 

 

python安装RabbitMQ模块

六、RPC(Remote Procedure Call)

奇骏PC的概念可看小编百度的(其实就恍如小编事先做的FTP,作者从客户端发二个限令,服务端重临相关新闻):

威尼斯人线上娱乐 63威尼斯人线上娱乐 64

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

View Code

上边重点讲下CRUISERPC通讯,笔者刚开头学挺难的,学完以往感觉RubiconPC通讯的盘算很有启发性,代码的例子写得也很牛!!

威尼斯人线上娱乐 65

client端发的音信被server端接收后,server端会调用callback函数,执行职分后,还必要把相应的新闻发送到client,可是server如何将消息发还给client?假设有八个client连接server,server又怎么精通是要发给哪个client??

福睿斯PC-server暗中认可监听rpc_queue.肯定不可能把要发放client端的音讯发到rpc_queue吧(rpc_queue是监听client端发到server端的数码)。

理所当然的方案是server端另起二个queue,通过queue将新闻重回给对应client。但难题又来了,queue是server端起的,故client端肯定不掌握queue_name,连queue_name都不晓得,client端接收毛线的数额??

消除措施:

客户端在发送指令的还要报告服务端:职责履行完后,数据通过某队列再次来到结果。客户端监听该队列就OK了。

client端:

 1 import pika
 2 import uuid
 3 
 4 
 5 class FibonacciRpcClient(object):
 6     def __init__(self):
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 
 9         self.channel = self.connection.channel()
10         #随机建立一个queue,为了监听返回的结果
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue   ##队列名
13 
14         self.channel.basic_consume(self.on_response,  #一接收客户端发来的指令就调用回调函数on_response
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):  #回调
19         #每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
20         #此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果
21         if self.corr_id == props.correlation_id:
22             self.response = body
23 
24     def call(self, n):
25         self.response = None    ##指令执行后返回的消息
26         self.corr_id = str(uuid.uuid4())   ##可用来标识指令(顺序)
27         self.channel.basic_publish(exchange='',
28                                    routing_key='rpc_queue', #client发送指令,发到rpc_queue
29                                    properties=pika.BasicProperties(
30                                        reply_to=self.callback_queue, #将指令执行结果返回到reply_to队列
31                                        correlation_id=self.corr_id,
32                                    ),
33                                    body=str(n))
34         while self.response is None:
35             self.connection.process_data_events() #去queue接收数据(不阻塞)
36         return int(self.response)
37 
38 
39 fibonacci_rpc = FibonacciRpcClient()
40 
41 print(" [x] Requesting fib(30)")
42 response = fibonacci_rpc.call(30)
43 print(" [.] Got %r" % response)

server端:

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     host='localhost'))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='rpc_queue')
10 
11 
12 def fib(n):
13     if n == 0:
14         return 0
15     elif n == 1:
16         return 1
17     else:
18         return fib(n - 1) + fib(n - 2)
19 
20 
21 def on_request(ch, method, props, body):
22     n = int(body)
23 
24     print(" [.] fib(%s)" % n)
25     response = fib(n)  #从客户端收到的消息
26 
27     ch.basic_publish(exchange='',   ##服务端发送返回的数据到props.reply_to队列(客户端发送指令时声明)
28                      routing_key=props.reply_to,  #correlation_id (随机数)每条指令都有随机独立的标识符
29                      properties=pika.BasicProperties(correlation_id= \
30                                                          props.correlation_id),
31                      body=str(response))
32     ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端持久化
33 
34 
35 channel.basic_qos(prefetch_count=1)  #公平分发
36 channel.basic_consume(on_request,    #一接收到消息就调用on_request
37                       queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

 

中转表明出处: 

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

二、达成最不难易行的类别通讯

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)   #接受到消息后不返回ack,无论本地是否处理完消息都会在队列中消失
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

注:windows连linux上的rabbitMQ会并发报错,要求提供用户名密码

3、RabbitMQ音信分发轮询

先运营音讯生产者,然后再分别运维1个顾客,通过生产者多发送几条音讯,你会发现,这几条音信会被逐1分配到各类消费者身上

威尼斯人线上娱乐 66

 

在那种情势下,RabbitMQ会默许把p发的音讯公平的逐条分发给各类消费者(c),跟负载均衡差不离

威尼斯人线上娱乐 67威尼斯人线上娱乐 68

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

威尼斯人线上娱乐 69威尼斯人线上娱乐 70

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

通过履行pubulish.py和consume.py能够完结地点的音信公平分发,那假诺c壹收到新闻之后宕机了,会冒出哪些情状呢?rabbitMQ是什么样处理的?今后大家模拟一下

威尼斯人线上娱乐 71威尼斯人线上娱乐 72

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

威尼斯人线上娱乐 73威尼斯人线上娱乐 74

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

在consume.py的callback函数里扩张了time.sleep模拟函数处理,通过地方程序进行效仿发现,c一收下到音信后并未处理完突然宕机,音讯就从队列上未有了,rabbitMQ把音信删除掉了;倘若程序需要音信必要求处理完才能从队列里删除,这大家就必要对先后开始展览处理一下

威尼斯人线上娱乐 75威尼斯人线上娱乐 76

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

威尼斯人线上娱乐 77威尼斯人线上娱乐 78

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    #time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

透过把consume.py接收端里的no_ack``=``True去掉之后并在callback函数里面添加ch.basic_ack(delivery_tag ``= method.delivery_tag,就能够实现音信不被处理完不能够在队列里清除

翻看新闻队列数:

威尼斯人线上娱乐 79

4、新闻持久化

借使音信在传输进程中rabbitMQ服务器宕机了,会发现后面包车型客车消息队列就不存在了,那时我们将要用到音讯持久化,音讯持久化会让队列不趁早服务器宕机而消退,会永远的保存下去

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

伍、音信公平分发

如果Rabbit只管按顺序把音信发到种种消费者身上,不思量消费者负载的话,很恐怕出现,3个机器配置不高的买主那里堆积了好多音信处理不完,同时计划高的消费者却直接很轻松。为消除此题材,能够在一1消费者端,配置perfetch=壹,意思便是告诉RabbitMQ在自个儿这一个消费者当前音讯还没处理完的时候就毫无再给小编发新新闻了

威尼斯人线上娱乐 80

channel.basic_qos(prefetch_count=1)

带音讯持久化+公平分发

威尼斯人线上娱乐 81威尼斯人线上娱乐 82

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

威尼斯人线上娱乐 83威尼斯人线上娱乐 84

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

6、Publish\Subscribe(音信发表\订阅) 

事先的例子都基本都以一对一的音信发送和吸纳,即音信只可以发送到钦命的queue里,但多少时候你想让你的音讯被有着的Queue收到,类似广播的效劳,那时候就要用到exchange了,

An exchange is a very simple thing. On one
side it receives messages from producers and the other side it pushes
them to queues. The exchange must know exactly what to do with a message
it receives. Should it be appended to a particular queue? Should it be
appended to many queues? Or should it get discarded. The rules for that
are defined by the exchange type.

Exchange在概念的时候是有档次的,以决定到底是如何Queue符合条件,能够收起音讯

fanout: 全体bind到此exchange的queue都能够接收消息
direct: 通过routingKey和exchange决定的万分唯一的queue可以吸纳音讯
topic:全体符合routingKey(此时得以是贰个表达式)的routingKey所bind的queue能够接到讯息

headers: 通过headers
来决定把新闻发给哪些queue

表明式符号表明:#代表3个或多个字符,*表示任何字符

     
 例:#.a会匹配a.a,aa.a,aaa.a等
           
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange
Type为topic的时候一定于接纳fanout 

一fanout接受全数广播:广播表示近日消息是实时的,要是未有三个主顾在收受新闻,消息就会丢掉,在那里消费者的no_ack已经无用,因为fanout不会管你处理新闻甘休未有,发过的新闻不会重发,记住广播是实时的

威尼斯人线上娱乐 85

 

威尼斯人线上娱乐 86威尼斯人线上娱乐 87

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',   #广播不用声明queue
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

publish.py

威尼斯人线上娱乐 88威尼斯人线上娱乐 89

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,
                                                # exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',         # 绑定转发器,收转发器上面的数据
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

consume.py

贰有选用的接受音讯 direct:
 同fanout一样,
no_ack在此要设置为True,不然队列里多少不会清空(即使也不会重发)**

RabbitMQ还支持依据主要字发送,即:队列绑定关键字,发送者将数据依照重大字发送到新闻exchange,exchange根据关键字 判定应该将数据发送至钦赐队列

威尼斯人线上娱乐 90

 

威尼斯人线上娱乐 91威尼斯人线上娱乐 92

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

publish.py

威尼斯人线上娱乐 93威尼斯人线上娱乐 94

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

叁更密切的音信过滤 topic:

Although using the direct exchange improved our system, it still has
limitations – it can’t do routing based on multiple
criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix
tool, which routes logs based on both severity (info/warn/crit…) and
facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’

威尼斯人线上娱乐 95

威尼斯人线上娱乐 96威尼斯人线上娱乐 97

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

publish.py

威尼斯人线上娱乐 98威尼斯人线上娱乐 99

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

 

LacrossePC(Remote procedure
call )双向通讯

To illustrate how an RPC service could be
used we’re going to create a simple client class. It’s going to expose a
method named call which sends an RPC request and
blocks until the answer is received:

威尼斯人线上娱乐 100

rpc client:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import uuid,time


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, #只要收到消息就执行on_response
                                   no_ack=True,     #不用ack确认
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:    #验证码核对
            self.response = body


    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,    #发送返回信息的队列name
                                       correlation_id=self.corr_id,     #发送uuid 相当于验证码
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming
            print("no messages")
            time.sleep(0.5)     #测试
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()    #实例化
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)       #执行call方法
print(" [.] Got %r" % response)

rpc server:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,    #回信息队列名
                     properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
                      queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 


相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图