All pastes #1881774 Raw Edit

Plugging AMQP and WebSockets

public python v1 · immutable
#1881774 ·published 2010-06-12 17:45 UTC
rendered paste body
# -*- coding: utf-8 -*-import sysimport timeimport selectimport loggingfrom logging import handlersif hasattr(select, "poll"):    from asyncore import poll2 as pollelse:    from asyncore import pollimport pikaimport tornadoimport tornado.httpserverimport tornado.ioloopfrom tornado import websocketfrom cherrypy.process import wspbus, pluginsclass MyBus(wspbus.Bus):    def __init__(self, name=""):        wspbus.Bus.__init__(self)        self.open_logger(name)        self.subscribe("log", self._log)        self.ioloop = tornado.ioloop.IOLoop.instance()        self.ioloop.add_callback(self.call_main)    def call_main(self):        self.publish('main')        time.sleep(0.1)        self.ioloop.add_callback(self.call_main)            def block(self):        ioloop = tornado.ioloop.IOLoop.instance()        try:            ioloop.start()        except KeyboardInterrupt:            ioloop.stop()            self.exit()    def exit(self):        wspbus.Bus.exit(self)        self.close_logger()    def open_logger(self, name=""):        logger = logging.getLogger(name)        logger.setLevel(logging.INFO)        h = logging.StreamHandler(sys.stdout)        h.setLevel(logging.INFO)        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))        logger.addHandler(h)        self.logger = logger    def close_logger(self):        for handler in self.logger.handlers:            handler.flush()            handler.close()                def _log(self, msg="", level=logging.INFO):        self.logger.log(level, msg)class WS2AMQPPlugin(plugins.SimplePlugin):    def __init__(self, bus):        plugins.SimplePlugin.__init__(self, bus)        self.conn = pika.AsyncoreConnection(pika.ConnectionParameters('localhost'))        self.channel = self.conn.channel()        self.channel.exchange_declare(exchange="X", type="direct", durable=False)        self.channel.queue_declare(queue="Q", durable=False, exclusive=False)        self.channel.queue_bind(queue="Q", exchange="X", routing_key="")        self.channel.basic_consume(self.amqp2ws, queue="Q")        self.bus.subscribe("ws2amqp", self.ws2amqp)        self.bus.subscribe("stop", self.cleanup)    def cleanup(self):        self.bus.unsubscribe("ws2amqp", self.ws2amqp)        self.bus.unsubscribe("stop", self.cleanup)        self.channel.queue_delete(queue="Q")        self.channel.exchange_delete(exchange="X")        self.conn.close()    def amqp2ws(self, ch, method, header, body):        self.bus.publish("amqp2ws", body)        ch.basic_ack(delivery_tag=method.delivery_tag)            def ws2amqp(self, message):        self.bus.log("Publishing to AMQP: %s" % message)        self.channel.basic_publish(exchange="X", routing_key="", body=message)        bus = MyBus()class WebSocket2AMQP(websocket.WebSocketHandler):    def __init__(self, *args, **kwargs):        websocket.WebSocketHandler.__init__(self, *args, **kwargs)        self.settings['bus'].subscribe("amqp2ws", self.push_message)            def open(self):        self.receive_message(self.on_message)    def on_message(self, message):        self.settings['bus'].publish("ws2amqp", message)        self.write_message(message)        self.receive_message(self.on_message)    def on_connection_close(self):        self.settings['bus'].unsubscribe("amqp2ws", self.push_message)    def push_message(self, message):        self.write_message(message)if __name__ == '__main__':    application = tornado.web.Application([        (r"/ws", WebSocket2AMQP),        ], bus=bus)        http_server = tornado.httpserver.HTTPServer(application)    http_server.listen(8888)        bus.subscribe("main", poll)    WS2AMQPPlugin(bus).subscribe()    bus.start()    bus.block()