#!/usr/bin/env python # # LolBot # # New version based on Twisted IRC. """ Useful bot for folks stuck behind censor walls at work Logs a channel and collects URLs for later. """ try: import sys import os import string import random import time import getopt from twisted.words.protocols import irc from twisted.internet import protocol from twisted.internet import reactor from datetime import datetime from mechanize import Browser from sqlalchemy import MetaData, Table, Column, String, Text, Integer, DateTime, engine_from_config from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base except ImportError: print "Some modules missing: Lolbot relies on Twisted IRC, Mechanize and SQLAlchemy.\n" sys.exit # Exclamations - wrong input exclamations = [ "Zing!", "Burns!", "Tard!", "Lol.", "Crazy!", "WTF?", ] # Ponderings ponderings = [ "Hi, can I have a medium lamb roast, with just potatoes.", "Can I slurp on your Big Cock?", "Quentin Tarantino is so awesome I want to have his babies.", "No it's a week night 8pm is past my bedtime.", ] SqlBase = declarative_base() class Log(SqlBase): """ This class represents an event in the log table and inherits from a SQLAlchemy convenience ORM class. """ __tablename__ = "log" id = Column(Integer, primary_key=True) timestamp = Column(DateTime) nickname = Column(String(20)) text = Column(Text) def __init__(self, nickname, text, timestamp=None): """ Creates an event log for the IRC logger. """ if timestamp is None: timestamp = datetime.now() self.timestamp = timestamp self.nickname = nickname self.text = text def __repr__(self): return "(%s) %s: %s" % (self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), self.nickname, self.text) class Url(SqlBase): """ This class represents a saved URL and inherits from a SQLAlchemy convenience ORM class. """ __tablename__ = "url" id = Column(Integer, primary_key=True) timestamp = Column(DateTime) nickname = Column(String(20)) url = Column(String(200), unique=True) title = Column(Text) def __init__(self, nickname, url, title=None, timestamp=None): if timestamp is None: timestamp = datetime.now() self.timestamp = timestamp self.nickname = nickname self.url = url self.title = title # populate the title from the URL if not given. if title is None: try: br = Browser() br.open(self.url) self.title = br.title() except Exception as ex: self.title = '' def __repr__(self): if not self.title: return "%s: %s" % (self.nickname, self.url) else: return "%s: %s - %s" % (self.nickname, self.url, self.title) class LolBot(irc.IRCClient): """ The Lolbot itself. """ def created(self, when): # connect to the database self.dbengine = engine_from_config(self.config, prefix="db.") SqlBase.metadata.bind = self.dbengine SqlBase.metadata.create_all() self.get_session = sessionmaker(bind=self.dbengine) self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); clear - clears the list; lol - say something funny; - adds the URL to the list; help - this message." def now(self): return datetime.today().strftime("%Y-%m-%d %H:%M:%S") def save_url(self, nickname, url): title = False try: db = self.get_session() if not db.query(Url).filter(Url.url == url).count(): theurl = Url(nickname, url) db.add(theurl) db.commit() else: theurl = db.query(Url).filter(Url.url == url).one() print theurl title = theurl.title except Exception, ex: print "Exception caught saving URL: %s" % ex return title def log_event(self, nick, text): try: entry = Log(nick, text) db = self.get_session() db.add(entry) db.commit() print entry except Exception, ex: print "Exception caught logging event: %s" % ex def _get_nickname(self): return self.factory.nickname nickname = property(_get_nickname) def _get_channel(self): return self.factory.channel channel = property(_get_channel) def _get_config(self): return self.factory.config config = property(_get_config) def signedOn(self): self.join(self.channel) print "Signed on as %s." % (self.nickname,) def joined(self, channel): print "Joined %s." % (channel,) def privmsg(self, user, channel, msg): if channel != self.channel: # Private /msg from a user self.do_command(msg, user) else: # log it self.log_event(user, msg) args = string.split(msg, ":", 1) if len(args) > 1 and args[0] == self.nickname: self.do_command(string.strip(args[1])) else: # parse it for links, add URLs to the list words = msg.split(" ") for w in words: if w.startswith('http://') or w.startswith('https://'): title = self.save_url(user, w) if title == False: self.say_public("Sorry, I'm useless at UTF-8.") else: self.say_public("URL added. %s" % title) def say_public(self, text): "Print TEXT into public channel, for all to see." self.msg(self.channel, text) self.log_event(self.nickname, text) def say_private(self, nick, text): "Send private message of TEXT to NICK." self.msg(nick, text) def reply(self, text, to_private=None): "Send TEXT to either public channel or TO_PRIVATE nick (if defined)." if to_private is not None: self.say_private(to_private, text) else: self.say_public(text) def ponder(self): "Return a random pondering." return random.choice(ponderings) def exclaim(self): "Return a random exclamation string." return random.choice(exclamations) def do_command(self, cmd, target=None): """ This is the function called whenever someone sends a public or private message addressed to the bot. (e.g. "bot: blah"). """ target = target.strip() try: if cmd == 'help': self.reply(self.helptext, target) elif cmd == 'lol': self.reply(self.ponder(), target) elif cmd == 'urls' or cmd == 'list': db = self.get_session() for url in db.query(Url).order_by(Url.timestamp.desc())[:10]: line = "%s %s" % (url.url, url.title) self.reply(line, target) time.sleep(1) elif cmd.startswith('urls ') or cmd.startswith('list '): db = self.get_session() (listcmd, n) = cmd.split(" ", 1) n = n.strip() if n == "all": rows = db.query(Url).order_by(Url.timestamp.desc()) elif n.find("-") > 0: (x, y) = n.split("-", 1) try: x = abs(int(x)) y = abs(int(y)) if y < x: x, y = y, x except ValueError, ex: self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target) raise ex rows = db.query(Url).order_by(Url.timestamp.desc())[x-1:y] else: try: n = abs(int(n)) except ValueError, ex: self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target) raise ex rows = db.query(Url).order_by(Url.timestamp.desc())[:n] for url in rows: line = "%s %s" % (url.url, url.title) self.reply(line, target) time.sleep(1) elif cmd.startswith('http:') or cmd.startswith('https:'): title = self.save_url(from_private, cmd) if title == False: self.say_public("Sorry, I'm useless at UTF-8.") else: self.reply('URL added. %s' % title, target) else: self.reply(self.exclaim(), target) except Exception, ex: print "Exception caught processing command: %s" % ex print " command was '%s' from %s" % (cmd, target) self.reply("Sorry, I didn't understand: %s" % cmd, target) self.reply(self.helptext, target) class LolBotFactory(protocol.ClientFactory): protocol = LolBot def __init__(self, config_path): self.config = get_config(config_path) self.server = self.config['irc.server'] self.port = self.config['irc.port'] self.nickname = self.config['irc.nickname'] self.channel = self.config['irc.channel'] def clientConnectionLost(self, connector, reason): print "Lost connection (%s), reconnecting." % (reason,) connector.connect() def clientConnectionFailed(self, connector, reason): print "Could not connect: %s" % (reason,) def get_options(): try: (options, args) = getopt.getopt(sys.argv[1:], "hc:", ["help", "config=", ]) except getopt.GetoptError, err: print str(err) usage() sys.exit(2) config_path = "" for option,value in options: if option in ("-h", "--help"): usage() sys.exit(2) if option in ("-c", "--config"): config_path = value return { 'config_path': config_path } def get_config(config_path): """ This method loads configuration options from a lolbot.conf file. The file should look like this: irc.server = irc.freenode.net irc.port = 6667 irc.channel = #lolbottest irc.nickname = lolbot db.url = sqlite:///lolbot.db """ if not config_path: config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lolbot.conf") if not os.path.exists(config_path): print "Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option." usage() sys.exit(1) # open the configuration file and grab all param=value declarations. config = {} with open(config_path) as f: for line in f: # skip comments if line.strip().startswith("#") or line.strip() == "": continue # collect up param = value try: (param, value) = line.strip().split("=", 1) if param.strip() != "": config[param.strip()] = value.strip() except ValueError: continue # validate IRC host if "irc.server" not in config.keys(): print "Error: the IRC server was not specified. Use --help for more information." sys.exit(1) # validate IRC port if "irc.port" not in config.keys(): config["irc.port"] = "6667" try: config["irc.port"] = int(config["irc.port"]) except ValueError: print "Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information." sys.exit(1) # validate IRC channel if "irc.channel" not in config.keys() or not config["irc.channel"].startswith("#"): print "Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information." sys.exit(1) # validate bot nickname if "irc.nickname" not in config.keys(): config["irc.nickname"] = "lolbot" return config def usage(): print """Run a lolbot. -h, --help This message. -c, --config= Specify a configuration file. Defaults to lolbot.conf in the same directory as the script. Configuration: irc.server = The IRC server, e.g. irc.freenode.net irc.port = The IRC server port. Default: 6667 irc.channel = The chat channel to join, e.g. #trainspotting irc.nickname = The nickname for your lolbot. Default: "lolbot" db.url = The URL to your lolbot database. This can be any valid URL accepted by SQL Alchemy. If not specified, lolbot will attempt to use a SQLite database called lolbot.db in the same directory as the script. """ if __name__ == "__main__": args = get_options() config = get_config(args['config_path']) try: reactor.connectTCP(config['irc.server'], config['irc.port'], LolBotFactory(args['config_path'])) reactor.run() except KeyboardInterrupt: print "Shutting down."