Python RabbitMQ实现简单的进程间通信示例
RabbitMQ 消息队列
threading Queue
进程Queue 父进程与子进程,或同一父进程下的多个子进程进行交互
rabbitmq_server start
安装 Python module
rabbit 默认端口15672
rabbitmqctl.bat list_queue
topic: 所有符合routingkey(此时可以是一个表达式)的routingkey所bind的queue都可以接受消息
# 代表一个或多个字符 * 代表任何字符
remote procedure call 双向传输,指令<-------->指令执行结果
实现方法: 创建两个队列,一个队列收指令,一个队列发送执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | # Author : Xuefeng import pika connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = # create the queue, the name of queue is "hello" # durable=True can make the queue be exist, although the service have stopped before. channel.queue_declare(queue = "hello" , durable = True ) # n RabbitMQ a message can never be sent directly to queue,it always need to go through channel.basic_publish(exchange = " " , routing_key = "hello" , body = "Hello world!" , properties = pika.BasicPropreties( delivery_mode = 2 , # make the message persistence ) ) print ( "[x] sent 'Hello world!'" ) connection.close() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | # Author : Xuefeng import pika connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = channel.queue_declare(queue = "hello" , durable = True ) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print ( "------>" , ch, method, properties ) print ( "[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # follow is for consumer to auto change with the ability channel.basic_qos(profetch_count = 1 ) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = "hello" , no_ack = True ) print ( "[*] Waiting for messages. To Exit press CTRL+C" ) channel.start_consuming() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | # Author : Xuefeng import pika import sys # 广播模式: # 生产者发送一条消息,所有的开通链接的消费者都可以接收到消息 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = channel.exchange_declare(exchange = "logs" , type = "fanout" ) message = ' ' .join(sys.argv[ 1 :]) or "info:Hello world!" channel.basic_publish( exchange = "logs" , routing_key = "", body = message ) print ( "[x] Send %r" % message) connection.close() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | # Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue print ( "Random queue name:" , queue_name) channel.queue_bind(exchange = "logs" , queue = queue_name) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print ( "------>" , ch, method, properties ) print ( "[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = "hello" , no_ack = True ) print ( "[*] Waiting for messages. To Exit press CTRL+C" ) channel.start_consuming() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | # Author : Xuefeng import pika import sys # 消息过滤模式: # 生产者发送一条消息,通过severity优先级来确定是否可以接收到消息 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = channel.exchange_declare(exchange = "direct_logs" , type = "direct" ) severity = sys.argv[ 1 ] if len (sys.argv) > 1 else "info" message = ' ' .join(sys.argv[ 2 :]) or "info:Hello world!" channel.basic_publish( exchange = "direct_logs" , routing_key = severity, body = message ) print ( "[x] Send %r:%r" % (severity, message)) connection.close() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | # Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = channel.exchange_declare(exchange = "direct_logs" , type = "direct" ) # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue print ( "Random queue name:" , queue_name) severities = sys.argv[ 1 :] if not severities: sys.stderr.write( "Usage:%s [info] [warning] [error]\n" % sys.argv[ 0 ]) sys.exit( 1 ) for severity in severities: channel.queue_bind(exchange = "direct_logs" , queue = queue_name, routing_key = severity) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print ( "------>" , ch, method, properties ) print ( "[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = "hello" , no_ack = True ) print ( "[*] Waiting for messages. To Exit press CTRL+C" ) channel.start_consuming() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | # Author : Xuefeng import pika import sys # 消息细致过滤模式: # 生产者发送一条消息,通过运行脚本 *.info 等确定接收消息类型进行对应接收 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = channel.exchange_declare(exchange = "topic_logs" , type = "topic" ) binding_key = sys.argv[ 1 ] if len (sys.argv) > 1 else "info" message = ' ' .join(sys.argv[ 2 :]) or "info:Hello world!" channel.basic_publish( exchange = "topic_logs" , routing_key = binding_key, body = message ) print ( "[x] Send %r:%r" % (binding_key, message)) connection.close() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | # Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = channel.exchange_declare(exchange = "topic_logs" , type = "topic" ) # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue print ( "Random queue name:" , queue_name) binding_keys = sys.argv[ 1 :] if not binding_keys: sys.stderr.write( "Usage:%s [info] [warning] [error]\n" % sys.argv[ 0 ]) sys.exit( 1 ) for binding_key in binding_keys: channel.queue_bind(exchange = "topic_logs" , queue = queue_name, routing_key = binding_key) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print ( "------>" , ch, method, properties) print ( "[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = "hello" , no_ack = True ) print ( "[*] Waiting for messages. To Exit press CTRL+C" ) channel.start_consuming() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | # Author : Xuefeng import pika import time import uuid class FibonacciRpcClient( object ): def __init__( self ): self .connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) self .channel = self result = self .channel.queue_declare(exclusive = True ) self .callback_queue = result.method.queue # 随机的生成一个接收命令执行结果的队列 self .channel.basic_consume( self .on_response, # 只要收到消息就调用 no_ack = True , queue = self .callback_queue) def on_response( self , ch, method, props, body): if self .corr_id = = props.correlation_id: self .response = body def call( self ,n): self .response = None self .corr_id = str (uuid.uuid4()) self .channel.basic_publish( exchange = "", routing_key = "rpc_queue" , properties = pika.BasicPropreties( rely_to = self .callback_queue, correlation_id = self .corr_id # 通过随机生成的ID来验证指令执行结果与指令的匹配性 ), body = str (n) ) while self .response is None : self .connection.process_data_events() # 非阻塞版的start_consume,有没有消息都继续 print ( "no message..." ) time.sleep( 0.5 ) return int ( self .response) fibonacci_rcp = FibonacciRpcClient() print ( "[x] Requesting fib(30)" ) response = 30 ) print ( "[x] Rec %r" % response) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | # Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = channel.queue_declare(queue = "rpc_queue" ) def fib(n): if n = = 0 : return 0 elif n = = 1 : return 1 else : return fib(n - 1 ) + fib(n - 2 ) def on_request(ch, method, props, body): n = int (body) print ( "[.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange = "", routing_key = props.rely_to, properties = pika.BasicPropreties(correlation_id = \ props.correlation), body = str (body) ) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count = 1 ) channel.basic_consume(on_request, queue = "rpc_queue" ) print ( "[x] Awaiting RPC requests" ) channel.start_consumeing() channel.exchange_declare(exchange = "direct_logs" , type = "direct" ) # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue print ( "Random queue name:" , queue_name) severities = sys.argv[ 1 :] |
到此这篇关于Python RabbitMQ实现简单的进程间通信示例的文章就介绍到这了,更多相关Python RabbitMQ进程间通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
微信公众号搜索 “ 脚本之家 ” ,选择关注
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 进行投诉反馈,一经查实,立即处理!
这篇文章主要介绍了Python转换itertools.chain对象为数组的方法,通过代码给大家介绍了itertools 的 chain() 方法,需要的朋友可以参考下2020-02-02Python 数值区间处理_对interval 库的快速入门详解
今天小编就为大家分享一篇Python 数值区间处理_对interval 库的快速入门详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2018-11-11