首页 > 编程开发 > Python基础教程 >
-
python基础教程之RabbitMQ(2)
试听地址 https://www.xin3721.com/eschool/pythonxin3721/
创建一个名为"task1"的队列用来投递消息,durable=True队列持久化 12 channel.queue_declare(queue='task1', durable=True) 13 # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=''), 14 # 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据 15 message = ' '.join(sys.argv[1:]) or "很高兴见到你" 16 # 发送消息到指定的路由和路由键 17 channel.basic_publish( 18 exchange='', 19 routing_key='task1', 20 body=message, 21 # 消息持久化 22 properties=pika.BasicProperties(delivery_mode=2) 23 ) 24 print(" [x] Sent %r" % (message,)) 25 # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接 26 connection.close()
receiver.py
1 import time 2 3 import pika 4 # 创建凭证,使用rabbitmq用户密码登录 5 # 使用前,必须得验证身份 6 credentials = pika.PlainCredentials("guest","guest") 7 # 建立一个到RabbitMQ服务器的连接。 8 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',credentials=credentials)) 9 channel = connection.channel() 10 # 创建一个名为"task1"的队列用来投递消息 11 channel.queue_declare(queue='task1',durable=True) 12 13 def callbak(ch,method,properties,body): 14 print("[x] Received %r"%body.decode("utf8")) 15 time.sleep(body.decode("utf8").count('.')) 16 print(" [x] Done") 17 # 消息确认 18 ch.basic_ack(delivery_tag=method.delivery_tag) 19 20 # 只处理一条数据,作出响应后再处理下一条 21 channel.basic_qos(prefetch_count=1) 22 # 有消息来临,立即执行callbak,没有消息则夯住,等待消息 23 # 消息来了,立即去取,队列名字为task1 24 channel.basic_consume(queue='task1', on_message_callback=callbak) 25 # 开始消费,接收消息 26 channel.start_consuming()
注:会发现此时消息是谁有时间谁去处理,而不是轮询分配的方式。
11.exchange交换机
前面的学习只搭建了一个工作队列,每个任务只分发给一个工作者(worker),如果有些任务需要多个工作者都能够拿到,那么就需要使用到exchange。一个消息给到多个消费者,这种模式叫"发布/订阅"
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。而是将消息发送给交换机,由交换机发送给队列。
exchange的四种模式
fanout Exchange
扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。
topic Exchange
主题交换机是一种支持正则匹配的exchange,发送到topic exchange上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。该exchange的routing_key需要有一定的规则,交换机和队列的binding_key需要采用.#......的格式,每个部分用.分开,其中:*表示一个单词,#表示任意数量(零个或多个)单词。
headers Exchange
头交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个exchange声明成Headers exchange,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。绑定exchange和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比direct exchange,首部交换机的优势是匹配的规则不被限定为字符串(string)。
channel.exchange_declare(exchange='logs', exchange_type='fanout')
匿名的交换器
前面我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")默认的交换机。
回想我们之前是如何发布一则消息:
channel.basic_publish(exchange='', routing_key='hello', body=message)
exchange参数就是交换机的名称。空字符串代表默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。
现在,我们就可以发送消息到一个具名交换机了:
channel.basic_publish(exchange='logs', routing_key='', body=message)
临时队列
临时队列手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。只需要在调用queue_declare方法的时候,不提供queue参数的值就可以了:
result = channel.queue_declare(queue="")
我们可以通过result.method.queue获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
result.method.queue
当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。exclusive标识符即可达到此目的。
result = channel.queue_declare(queue="",exclusive=True) # 断开连接,队列立即删除
绑定(bindings)
已经创建了一个扇型交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换器和队列之间的联系我们称之为绑定(binding)。
channel.queue_bind(exchange='logs', queue=result.method.queue)
你可以使用rabbitmqctl list_bindings
列出所有现存的绑定。
发布/订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
扇形交换机(fanout)
如:日志文件。
sender.py
import pika import sys # 创建凭证,使用rabbitmq用户密码登录 credentials = pika.PlainCredentials("guest", "guest") # 创建一个rabbitmq连接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials)) channel = connection.channel() # 指定交换机类型 channel.exchange_declare(exchange='m1',exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "很高兴见到你" # 指定发送给哪个交换机(exchange='')路由键(routing_key=''),参数body值发送的数据 channel.basic_publish( exchange='m1', routing_key='', body=message, ) print(" [x] Sent %r" % (message,)) connection.close()
receiver.py
import time import pika credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m1', exchange_type='fanout') # 随机创建一个队列,扇形交换机要将消息给所有人,所以不需要特意指定一个队列的名称。 result = channel.queue_declare(queue="", exclusive=True) # 获取队列的名字 queue_name = result.method.queue # 创建绑定,交换机和队列 channel.queue_bind(exchange='m1', queue=queue_name) def callbak(ch, method, properties, body): print("[x] Received %r" % body.decode("utf8")) time.sleep(body.decode("utf8").count('.')) print(" [x] Done") channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True) # 开始消费,接收消息 channel.start_consuming()
这样我们就完成了。如果你想把结果保存到日志文件中,只需要打开控制台输入:
python receiver.py > logs_from_rabbit.log # 使用该方式启动receiver.py
直连交换机
绑定(binding)是指交换机(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。
绑定的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,我们把它叫做绑定键(binding key)。以下是如何创建一个带绑定键的绑定。
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
直连交换机(direct exchange)将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。绑定的都是同一个路由,通过路由键来区分是谁处理任务。
比如:有三个任务,分别分配给三个人,任务1-->p1、任务2-->p2、任务3-->p3,你是一个任务发布者,说我想要任务1的结果了,对应的就是p1给你答案,想要任务2的结果了,对应的就是p2给你答案。
如上图所示:直连交换机绑定了Q1和Q2两个队列,第一个队列使用orange作为绑定键,第二个队列使用black和green作为绑定键,当路由键为orange时会被路由到Q1这个队列。为black或green时会被路由到Q2这个队列。
多个绑定(Multiple bindings)
多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。
sender.py
import pika routing_key=input("请输入routing_key:") credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials)) channel = connection.channel() # 指定直连交换机 channel.exchange_declare(exchange='task1',exchange_type='direct') message = f"很高兴见到你{routing_key}" channel.basic_publish( exchange='task1', routing_key=routing_key, body=message, ) print(f"给{routing_key}发送消息成功") connection.close()
receiver1.py
import time import pika credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='task1', exchange_type='direct') # 随机创建一个队列 result = channel.queue_declare(queue="", exclusive=True) # 获取队列的名字 queue_name = result.method.queue # 绑定交换机和队列 channel.queue_bind(exchange='task1', queue=queue_name, routing_key="xiaoqi") def callbak(ch, method, properties, body): print(body.decode("utf8")) channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True) # 开始消费,接收消息 channel.start_consuming()
receiver2.py
import time import pika credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='task1', exchange_type='direct') # 随机创建一个队列 result = channel.queue_declare(queue="", exclusive=True) # 获取队列的名字 queue_name = result.method.queue # 绑定交换机和队列 channel.queue_bind(exchange='task1', queue=queue_name, routing_key="dada") def callbak(ch, method, properties, body): print(body.decode("utf8")) channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True) # 开始消费,接收消息 channel.start_consuming()
应用场景:可以是错误日志,不同日志保存到不同的文件。
如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:
python receive_logs_direct.py warning error > logs_from_rabbit.log # 此方式开启receiver
主题交换机
发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.
分隔开的词语列表。词语的个数可以随意,但是不要超过255字节。
*
(星号) 用来表示一个单词.
#
(井号) 用来表示任意数量(零个或多个)单词。
主题交换机是很强大的,它可以表现出跟其他交换机类似的行为
-
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
-
当
*
(星号) 和#
(井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
sender.py
import pika routing_key=input("请输入routing_key:") credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials)) channel = connection.channel() # 指定直连交换机 channel.exchange_declare(exchange='task2',exchange_type='topic') message = f"很高兴见到你{routing_key}" channel.basic_publish( exchange='task2', routing_key=routing_key, body=message, ) print(f"给{routing_key}发送消息成功") connection.close()
receiver.py
import time import pika credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='task2', exchange_type='topic') # 随机创建一个队列 result = channel.queue_declare(queue="", exclusive=True) # 获取队列的名字 queue_name = result.method.queue # 绑定交换机和队列 channel.queue_bind(exchange='task2', queue=queue_name, routing_key="*.md") def callbak(ch, method, properties, body): print(body.decode("utf8")) # 有消息来临,立即执行callbak,从队列q1中取数据;没有消息则夯住,等待消息 channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True) # 开始消费,接收消息 channel.start_consuming()
官方文档:http://rabbitmq.mr-ping.com/tutorials_with_python/[5]Topics.html
12.RPC远程过程调用
RPC远程过程调用,在计算机A上的进程,调用另外一台计算机B的进程,A上的进程被挂起,B上的被调用进程开始执行后,产生返回值给A,A继续执行。它是一个计算机通信协议。
RPC的关键不在于它是什么通信协议,重点在于可以通过RPC进行解耦服务。
应用:
一个电商的下单过程,涉及物流、支付、库存、红包等多个系统,多个系统又在多个服务器上,由不同的技术团队负责,整个下单过程,需要所有团队进行远程调用。
下单 { 库存>减少库存 支付>扣款 红包>减免红包 物流>生成订单}
python实现RPC
利用RabbitMQ构建一个RPC系统,包含了客户端和RPC服务器,依旧使用pika模块
Callback queue 回调队列
一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to
。
result = channel.queue_declare(queue="",exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
Correlation id 关联标识
一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id
属性,这样客户端在回调队列中根据correlation_id
字段的值就可以分辨此响应属于哪个请求。
客户端发送请求:某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息。
服务器端工作流: 等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中。
客户端接受处理结果: 客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用。
client.py
import pika import uuid class FibonacciRpcClient(object): def __init__(self): credentials = pika.PlainCredentials("guest", "guest") # 客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应 # 建立连接,指定服务器的ip地址 self.connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', credentials=credentials)) # 建立一个会话,每个channel代表一个会话任务 self.channel = self.connection.channel() # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次 # exclusive=True 参数是指只对首次声明它的连接可见 # exclusive=True 会在连接断开的时候,自动删除 result = self.channel.queue_declare(queue="", exclusive=True) # 将次队列指定为当前客户端的回调队列,用户接收服务端给客户端发送的信息 self.callback_queue = result.method.queue # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理; # 有消息来临,立即执行callbak,从队列on_response中取数据;没有消息则夯住,等待消息 self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) # 对回调队列中的响应进行处理的函数 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body # 发出RPC请求 # 例如这里服务端就是一场马拉松,跑完一段将棒交给下一个人,这个过程是发送rpc请求 def call(self, n): # 初始化 response self.response = None # 生成correlation_id 关联标识,通过python的uuid库,生成全局唯一标识ID,保证时间空间唯一性 self.corr_id = str(uuid.uuid4()) # 发送RPC请求内容到RPC请求队列`task2`,同时发送的还有`reply_to`和`correlation_id` self.channel.basic_publish(exchange='', routing_key='main_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) # 建立客户端 fibonacci_rpc = FibonacciRpcClient() # 发送RPC请求,丢进rpc队列,等待客户端处理完毕,给与响应 print("发送了请求sum(100)") response = fibonacci_rpc.call(100) print("得到远程结果响应%r" % response)
server.py
import pika credentials = pika.PlainCredentials("guest", "guest") # 建立连接,服务器地址为localhost,也可指定ip地址 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials)) # 建立会话 channel = connection.channel() # 模拟一个进程,例如接力比赛 def sum(n): n+=100 return n # 对RPC请求队列中的请求进行处理 def on_request(ch, method, props, body): print(body,type(body)) n = int(body) print(" 正在处理sum(%s)..." % n) # 调用数据处理方法 response = sum(n) # 将处理结果(响应)发送到回调队列 ch.basic_publish(exchange='', # reply_to代表回复目标 routing_key=props.reply_to, # correlation_id(关联标识):用来将RPC的响应和请求关联起来。 properties=pika.BasicProperties(correlation_id= props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) print(" 处理完成sum(%s)" % n) # 负载均衡,同一时刻发送给该服务器的请求不超过一个 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='main_queue',on_message_callback=on_request) print("等待接收rpc请求") #开始消费 channel.start_consuming()