Paste Description for Plugging AMQP and WebSockets
see http://www.defuze.org/archives/207-plugging-amqp-and-websocketplugging-amqp-and-websocket.html
- Plugging AMQP and WebSockets
- Saturday, June 12th, 2010 at 11:45:38am MDT
- # -*- coding: utf-8 -*-
- import sys
- import time
- import select
- import logging
- from logging import handlers
- if hasattr(select, "poll"):
- from asyncore import poll2 as poll
- else:
- from asyncore import poll
- import pika
- import tornado
- import tornado.httpserver
- import tornado.ioloop
- from tornado import websocket
- from cherrypy.process import wspbus, plugins
- class MyBus(wspbus.Bus):
- def __init__(self, name=""):
- wspbus.Bus.__init__(self)
- self.open_logger(name)
- self.subscribe("log", self._log)
- self.ioloop = tornado.ioloop.IOLoop.instance()
- self.ioloop.add_callback(self.call_main)
- def call_main(self):
- self.publish('main')
- time.sleep(0.1)
- self.ioloop.add_callback(self.call_main)
- def block(self):
- ioloop = tornado.ioloop.IOLoop.instance()
- try:
- ioloop.start()
- except KeyboardInterrupt:
- ioloop.stop()
- self.exit()
- def exit(self):
- wspbus.Bus.exit(self)
- self.close_logger()
- def open_logger(self, name=""):
- logger = logging.getLogger(name)
- logger.setLevel(logging.INFO)
- h = logging.StreamHandler(sys.stdout)
- h.setLevel(logging.INFO)
- h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
- logger.addHandler(h)
- self.logger = logger
- def close_logger(self):
- for handler in self.logger.handlers:
- handler.flush()
- handler.close()
- def _log(self, msg="", level=logging.INFO):
- self.logger.log(level, msg)
- class WS2AMQPPlugin(plugins.SimplePlugin):
- def __init__(self, bus):
- plugins.SimplePlugin.__init__(self, bus)
- self.conn = pika.AsyncoreConnection(pika.ConnectionParameters('localhost'))
- self.channel = self.conn.channel()
- self.channel.exchange_declare(exchange="X", type="direct", durable=False)
- self.channel.queue_declare(queue="Q", durable=False, exclusive=False)
- self.channel.queue_bind(queue="Q", exchange="X", routing_key="")
- self.channel.basic_consume(self.amqp2ws, queue="Q")
- self.bus.subscribe("ws2amqp", self.ws2amqp)
- self.bus.subscribe("stop", self.cleanup)
- def cleanup(self):
- self.bus.unsubscribe("ws2amqp", self.ws2amqp)
- self.bus.unsubscribe("stop", self.cleanup)
- self.channel.queue_delete(queue="Q")
- self.channel.exchange_delete(exchange="X")
- self.conn.close()
- def amqp2ws(self, ch, method, header, body):
- self.bus.publish("amqp2ws", body)
- ch.basic_ack(delivery_tag=method.delivery_tag)
- def ws2amqp(self, message):
- self.bus.log("Publishing to AMQP: %s" % message)
- self.channel.basic_publish(exchange="X", routing_key="", body=message)
- bus = MyBus()
- class WebSocket2AMQP(websocket.WebSocketHandler):
- def __init__(self, *args, **kwargs):
- websocket.WebSocketHandler.__init__(self, *args, **kwargs)
- self.settings['bus'].subscribe("amqp2ws", self.push_message)
- def open(self):
- self.receive_message(self.on_message)
- def on_message(self, message):
- self.settings['bus'].publish("ws2amqp", message)
- self.write_message(message)
- self.receive_message(self.on_message)
- def on_connection_close(self):
- self.settings['bus'].unsubscribe("amqp2ws", self.push_message)
- def push_message(self, message):
- self.write_message(message)
- if __name__ == '__main__':
- application = tornado.web.Application([
- (r"/ws", WebSocket2AMQP),
- ], bus=bus)
- http_server = tornado.httpserver.HTTPServer(application)
- http_server.listen(8888)
- bus.subscribe("main", poll)
- WS2AMQPPlugin(bus).subscribe()
- bus.start()
- bus.block()
Paste Details
Tags: python amqp websockets
advertising
Update the Post
Either update this post and resubmit it with changes, or make a new post.
You may also comment on this post.
Please note that information posted here will expire by default in one month. If you do not want it to expire, please set the expiry time above. If it is set to expire, web search engines will not be allowed to index it prior to it expiring. Items that are not marked to expire will be indexable by search engines. Be careful with your passwords. All illegal activities will be reported and any information will be handed over to the authorities, so be good.