Part of Slepp's ProjectsPastebinTURLImagebinFilebin
Feedback -- English French German Japanese
Create Upload Newest Tools Donate
Sign In | Create Account

Paste Description for WebSockets/AMQP chat client

see http://www.defuze.org/archives/213-a-quick-chata-quick-chat.htm

WebSockets/AMQP chat client
Monday, June 14th, 2010 at 12:36:32pm MDT 

  1. # -*- coding: utf-8 -*-
  2. import sys
  3. import random
  4. import time
  5. import select
  6. import logging
  7. from logging import handlers
  8. if hasattr(select, "poll"):
  9.     from asyncore import poll2 as poll
  10. else:
  11.     from asyncore import poll
  12.  
  13. import pika
  14. import tornado
  15. import tornado.httpserver
  16. import tornado.ioloop
  17. from tornado import websocket
  18. from cherrypy.process import wspbus, plugins
  19.  
  20. class MyBus(wspbus.Bus):
  21.     def __init__(self, name=""):
  22.         wspbus.Bus.__init__(self)
  23.         self.open_logger(name)
  24.         self.subscribe("log", self._log)
  25.  
  26.         self.ioloop = tornado.ioloop.IOLoop.instance()
  27.         self.ioloop.add_callback(self.call_main)
  28.  
  29.     def call_main(self):
  30.         self.publish('main')
  31.         time.sleep(0.1)
  32.         self.ioloop.add_callback(self.call_main)
  33.        
  34.     def block(self):
  35.         ioloop = tornado.ioloop.IOLoop.instance()
  36.         try:
  37.             ioloop.start()
  38.         except KeyboardInterrupt:
  39.             ioloop.stop()
  40.             self.exit()
  41.  
  42.     def exit(self):
  43.         wspbus.Bus.exit(self)
  44.         self.close_logger()
  45.  
  46.     def open_logger(self, name=""):
  47.         logger = logging.getLogger(name)
  48.         logger.setLevel(logging.INFO)
  49.         h = logging.StreamHandler(sys.stdout)
  50.         h.setLevel(logging.INFO)
  51.         h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
  52.         logger.addHandler(h)
  53.  
  54.         self.logger = logger
  55.  
  56.     def close_logger(self):
  57.         for handler in self.logger.handlers:
  58.             handler.flush()
  59.             handler.close()
  60.            
  61.     def _log(self, msg="", level=logging.INFO):
  62.         self.logger.log(level, msg)
  63.  
  64. class WS2AMQPPlugin(plugins.SimplePlugin):
  65.     def __init__(self, bus):
  66.         plugins.SimplePlugin.__init__(self, bus)
  67.         self.conn = pika.AsyncoreConnection(pika.ConnectionParameters('localhost'))
  68.         self.channel = self.conn.channel()
  69.         self.channel.exchange_declare(exchange="X", type="direct", durable=False)
  70.         self.channel.queue_declare(queue="Q", durable=False, exclusive=False)
  71.         self.channel.queue_bind(queue="Q", exchange="X", routing_key="")
  72.  
  73.         self.channel.basic_consume(self.amqp2ws, queue="Q")
  74.  
  75.         self.bus.subscribe("ws2amqp", self.ws2amqp)
  76.         self.bus.subscribe("stop", self.cleanup)
  77.  
  78.     def cleanup(self):
  79.         self.bus.unsubscribe("ws2amqp", self.ws2amqp)
  80.         self.bus.unsubscribe("stop", self.cleanup)
  81.         self.channel.queue_delete(queue="Q")
  82.         self.channel.exchange_delete(exchange="X")
  83.         self.conn.close()
  84.  
  85.     def amqp2ws(self, ch, method, header, body):
  86.         self.bus.publish("amqp2ws", body)
  87.         ch.basic_ack(delivery_tag=method.delivery_tag)
  88.        
  89.     def ws2amqp(self, message):
  90.         self.bus.log("Publishing to AMQP: %s" % message)
  91.         self.channel.basic_publish(exchange="X", routing_key="", body=message)
  92.        
  93. bus = MyBus()
  94.  
  95. class MainHandler(tornado.web.RequestHandler):
  96.     def get(self):
  97.         username = "User%d" % random.randint(0, 100)
  98.         self.write("""<html>
  99.         <head>
  100.           <script type='application/javascript' src='/static/jquery-1.4.2.min.js'> </script>
  101.           <script type='application/javascript'>
  102.             $(document).ready(function() {
  103.               var ws = new WebSocket('ws://192.168.0.10:8888/ws');
  104.               ws.onmessage = function (evt) {
  105.                  $('#chat').val($('#chat').val() + evt.data + '\\n');                 
  106.               };
  107.               $('#chatform').submit(function() {
  108.                  ws.send('%(username)s: ' + $('#message').val());
  109.                  $('#message').val("");
  110.                  return false;
  111.               });
  112.             });
  113.           </script>
  114.         </head>
  115.         <body>
  116.         <form action='/ws' id='chatform' method='post'>
  117.           <textarea id='chat' cols='35' rows='10'></textarea>
  118.           <br />
  119.           <label for='message'>%(username)s: </label><input type='text' id='message' />
  120.           <input type='submit' value='Send' />
  121.           </form>
  122.         </body>
  123.         </html>
  124.         """ % {'username': username})
  125.  
  126. class WebSocket2AMQP(websocket.WebSocketHandler):
  127.     def __init__(self, *args, **kwargs):
  128.         websocket.WebSocketHandler.__init__(self, *args, **kwargs)
  129.         self.settings['bus'].subscribe("amqp2ws", self.push_message)
  130.        
  131.     def open(self):
  132.         self.receive_message(self.on_message)
  133.  
  134.     def on_message(self, message):
  135.         self.settings['bus'].publish("ws2amqp", message)
  136.         self.receive_message(self.on_message)
  137.  
  138.     def on_connection_close(self):
  139.         self.settings['bus'].unsubscribe("amqp2ws", self.push_message)
  140.  
  141.     def push_message(self, message):
  142.         self.write_message(message)
  143.  
  144. if __name__ == '__main__':
  145.     application = tornado.web.Application([
  146.         (r"/", MainHandler),
  147.         (r"/ws", WebSocket2AMQP),
  148.         ], static_path=".", bus=bus)
  149.    
  150.     http_server = tornado.httpserver.HTTPServer(application)
  151.     http_server.listen(8888)
  152.    
  153.     bus.subscribe("main", poll)
  154.     WS2AMQPPlugin(bus).subscribe()
  155.     bus.start()
  156.     bus.block()

Paste Details

advertising

Update the Post

Either update this post and resubmit it with changes, or make a new post.

You may also comment on this post.

update paste below
details of the post (optional)

Note: Only the paste content is required, though the following information can be useful to others.

Save name / title?

(space separated, optional)



Please note that information posted here will expire by default in one month. If you do not want it to expire, please set the expiry time above. If it is set to expire, web search engines will not be allowed to index it prior to it expiring. Items that are not marked to expire will be indexable by search engines. Be careful with your passwords. All illegal activities will be reported and any information will be handed over to the authorities, so be good.

worth-right
fantasy-obligation