再见少年拉满弓,不惧岁月不惧风

使用kombu连接RabbitMQ

queues.py __author__ = 'nate' from kombu import Exchange, Queue test_exchange = Exchange('test-exchange', type='direct') data = ['abc', 'def', 'ghi'] test_queues = [Queue(name, test_exchange, routing_key=name) for name in data] client.py __author__ = 'nate' from kombu.pools import producers from kombu import Connection from queues import data from queues import test_exchange from datetime import datetime if __name__ == '__main__': url = 'amqp://nate:[email protected]:5672/test_vhost' payload = {'content': 'This is test at {now}'.format(now=datetime.now())} with Connection(url) as conn: with producers[conn].

使用SimpleXMLRPCServer编写RPC

服务端代码 #coding=utf-8 __author__ = 'nate' from SimpleXMLRPCServer import SimpleXMLRPCServer from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler from datetime import datetime class Test: def __init__(self): self.datetime = datetime def test_in_class(self, a, b): return a + b def _test_private(self): print 'test private funcation' def test(a, b): return a+b if __name__ == '__main__': port = 1234 try: server = SimpleXMLRPCServer(('localhost', port), SimpleXMLRPCRequestHandler, allow_none=True) server.register_function(test, 'test') server.register_instance(Test(), allow_dotted_names=True) print 'listening on port {port}'.format(port=port) server.serve_forever() except KeyboardInterrupt: print 'exit' 客户端代码 #coding=utf-8 __author__ = 'nate' import xmlrpclib if __name__ == '__main__': try: port = 1234 server = xmlrpclib.

在ZF2使用Soap调用Web Service

代码 public function testAction() { try { $client = new \Zend\Soap\Client(); $client->setWSDL('http://127.0.0.1/test/ws/query/event/detail?wsdl'); $client->setSoapVersion(SOAP_1_1); $result = $client->run("ssdsds"); var_dump($result); exit; } catch (\SoapFault $sf) { echo 'Error: [', $sf->getCode(), '] ', $sf->getMessage() ; } catch (\Exception $ex) { echo 'Exception: [', $ex->getCode(), '] ', $ex->getMessage(); } exit; }

Python with RabbitMQ to create direct Exchange

接收端 #coding=utf-8 __author__ = 'nate' import pika from pika.exceptions import ProbableAccessDeniedError from pika.exceptions import ProbableAuthenticationError def get(ch, method, props, body): print 'message: %r' % body if __name__ == '__main__': parameters = pika.URLParameters('amqp://nate:[email protected]:5672/test_vhost') conn = None channel = None try: conn = pika.BlockingConnection(parameters) channel = conn.channel() channel.exchange_declare(exchange='logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name, routing_key='test') channel.basic_consume(get, queue=queue_name, no_ack=True) channel.start_consuming() except ProbableAccessDeniedError: print 'Access Error' except ProbableAuthenticationError: print 'Auth Error' finally: if channel: channel.

Python with RabbitMQ to create topic Exchange

说明 * 表示匹配一个词 # 表示一个或多个词 接收端 #codig=utf-8 __author__ = 'nate' import pika from pika.exceptions import ProbableAccessDeniedError from pika.exceptions import ProbableAuthenticationError def get(ch, method, props, body): print 'message: %r' % body if __name__ == '__main__': parameters = pika.URLParameters('amqp://nate:[email protected]:5672/test_vhost') conn = None channel = None try: conn = pika.BlockingConnection(parameters) channel = conn.channel() channel.exchange_declare(exchange='t_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #routing_key = '#' #routing_key = 'log.*' routing_key = 'log.#' channel.queue_bind(exchange='t_logs', queue=queue_name, routing_key=routing_key) channel.

在ZF2中使用RabbitMQ

安装 安装信息加入composer.json { "require": { "videlalvaro/php-amqplib": "v2.1.0" } } 执行命令 php composer.phar install 配置 在config/autoload增加amqp.local.php <?php return array( 'amqp' => array( 'host' => '10.211.55.61', 'port' => 5672, 'user' => 'nate', 'password' => '123123', 'vhost' => 'test_vhost', 'debug' => true, ), ); Action代码 public function sendAction() { $config = $this->getServiceLocator()->get('config')['amqp']; $channel = null; $conn = null; try { $conn = new AMQPConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']); $channel = $conn->channel(); $exchange = 'logs_2'; //direct exchange $channel->exchange_declare($exchange, 'direct', false, false, false); //fanout exchange //$channel->exchange_declare($exchange, 'fanout', false, false, false); //topic exchange //$channel->exchange_declare($exchange, 'topic', false, false, false); //direct key $key = 'test'; //topic key //$key = 'log.
0%