lolbot/lolbot.py

373 lines
11 KiB
Python
Executable file

#!/usr/bin/env python
#
# LolBot
#
# Code originally based on example bot and irc-bot class from
# Joel Rosdahl <joel@rosdahl.net>, author of included python-irclib.
"""
Useful bot for folks stuck behind censor walls at work
Logs a channel and collects URLs for later.
"""
try:
import sys, string, random, time
from ircbot import SingleServerIRCBot, OutputManager
from irclib import nm_to_n, nm_to_h, irc_lower
import os
from datetime import datetime
from mechanize import Browser
import getopt
from sqlalchemy import MetaData, Table, Column, String, Text, Integer, DateTime, engine_from_config
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
except ImportError:
print "Some modules could not be loaded: Lolbot relies on Mechanize and SQLAlchemy.\n"
sys.exit
# Exclamations - wrong input
exclamations = [
"Zing!",
"Burns!",
"Tard!",
"Lol.",
"Crazy!",
"WTF?",
]
# Ponderings
ponderings = [
"Hi, can I have a medium lamb roast, with just potatoes.",
"Can I slurp on your Big Cock?",
"Quentin Tarantino is so awesome I want to have his babies.",
"No it's a week night 8pm is past my bedtime.",
]
SqlBase = declarative_base()
class Log(SqlBase):
"""
This class represents an event in the log table and inherits from a SQLAlchemy
convenience ORM class.
"""
__tablename__ = "log"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime)
nickname = Column(String(20))
text = Column(Text)
def __init__(self, nickname, text, timestamp=None):
if timestamp is None:
timestamp = datetime.now()
self.timestamp = timestamp
self.nickname = nickname
self.text = text
def __repr__(self):
return "(%s) %s: %s" % (self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), self.nickname, self.text)
class Url(SqlBase):
"""
This class represents a saved URL and inherits from a SQLAlchemy convenience
ORM class.
"""
__tablename__ = "url"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime)
nickname = Column(String(20))
url = Column(String(200), unique=True)
title = Column(Text)
def __init__(self, nickname, url, title=None, timestamp=None):
if timestamp is None:
timestamp = datetime.now()
self.timestamp = timestamp
self.nickname = nickname
self.url = url
self.title = title
# populate the title from the URL if not given.
if title is None:
try:
br = Browser()
br.open(self.url)
self.title = br.title()
except Exception as ex:
self.title = ''
def __repr__(self):
if not self.title:
return "%s: %s" % (self.nickname, self.url)
else:
return "%s: %s - %s" % (self.nickname, self.url, self.title)
class LolBot(SingleServerIRCBot):
"""
The Lolbot itself.
"""
def __init__(self, config_path=''):
self.get_config(config_path)
SingleServerIRCBot.__init__(self, [(self.server, self.port)], self.nickname, self.nickname)
# connect to the database
self.dbengine = engine_from_config(self.config, prefix="db.")
SqlBase.metadata.bind = self.dbengine
SqlBase.metadata.create_all()
self.get_session = sessionmaker(bind=self.dbengine)
self.helptext = "Adds URLs to a list. Commands: list - prints a bunch of URLs; clear - clears the list; lol - say something funny; <url> - adds the URL to the list; help - this message."
self.queue = OutputManager(self.connection)
self.queue.start()
self.start()
def get_config(self, config_path):
"""
This method loads configuration options from a lolbot.conf file. The file
should look like this:
irc.server = irc.freenode.net
irc.port = 6667
irc.channel = #lolbottest
irc.nickname = lolbot
db.url = sqlite:///lolbot.db
"""
if not config_path:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lolbot.conf")
if not os.path.exists(config_path):
print "Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option."
usage()
sys.exit(1)
# open the configuration file and grab all param=value declarations.
config = {}
with open(config_path) as f:
for line in f:
# skip comments
if line.strip().startswith("#") or line.strip() == "":
continue
# collect up param = value
try:
(param, value) = line.strip().split("=", 1)
if param.strip() != "":
config[param.strip()] = value.strip()
except ValueError:
continue
# validate IRC host
if "irc.server" not in config.keys():
print "Error: the IRC server was not specified. Use --help for more information."
sys.exit(1)
self.server = config["irc.server"]
# validate IRC port
if "irc.port" not in config.keys():
config["irc.port"] = "6667"
try:
self.port = int(config["irc.port"])
except ValueError:
print "Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information."
sys.exit(1)
# validate IRC channel
if "irc.channel" not in config.keys() or not config["irc.channel"].startswith("#"):
print "Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information."
sys.exit(1)
self.channel = config["irc.channel"]
# validate bot nickname
if "irc.nickname" not in config.keys():
config["irc.nickname"] = "lolbot"
self.nickname = config["irc.nickname"]
self.config = config
def now(self):
return datetime.today().strftime("%Y-%m-%d %H:%M:%S")
def save_url(self, nickname, url):
try:
db = self.get_session()
if not db.query(Url).filter(Url.url == url).count():
theurl = Url(nickname, url)
db.add(theurl)
db.commit()
else:
try:
theurl = db.query(Url).filter(Url.url == url).one()
print theurl
return theurl.title
except MultipleResultsFound, ex:
print ex #wtf
except NoResultsFound, ex:
print ex #wtf
except Exception, ex:
print "Exception caught saving URL: %s" % ex
return ""
def log_event(self, nick, text):
try:
entry = Log(nick, text)
db = self.get_session()
db.add(entry)
db.commit()
print entry
except Exception, ex:
print "Exception caught logging event: %s" % ex
def on_nicknameinuse(self, connection, event):
self.nickname = connection.get_nickname() + "_"
connection.nick(self.nickname)
def on_welcome(self, connection, event):
connection.join(self.channel)
def on_privmsg(self, connection, event):
"Deal with a /msg private message."
from_nick = nm_to_n(event.source())
self.do_command(event, event.arguments()[0], from_nick)
def on_pubmsg(self, connection, event):
"Deal with a public message in a channel."
# log it
from_nick = nm_to_n(event.source())
self.log_event(from_nick, event.arguments()[0])
args = string.split(event.arguments()[0], ":", 1)
if len(args) > 1 and irc_lower(args[0]) == irc_lower(self.nickname):
self.do_command(event, string.strip(args[1]), from_nick)
else:
# parse it for links, add URLs to the list
words = event.arguments()[0].split(" ")
for w in words:
if w.startswith('http://') or w.startswith('https://'):
title = self.save_url(from_nick, w)
self.say_public(title)
def say_public(self, text):
"Print TEXT into public channel, for all to see."
self.queue.send(text, self.channel)
self.log_event(self.nickname, text)
def say_private(self, nick, text):
"Send private message of TEXT to NICK."
self.queue.send(text, nick)
def reply(self, text, to_private=None):
"Send TEXT to either public channel or TO_PRIVATE nick (if defined)."
if to_private is not None:
self.say_private(to_private, text)
else:
self.say_public(text)
def ponder(self):
"Return a random pondering."
return random.choice(ponderings)
def exclaim(self):
"Return a random exclamation string."
return random.choice(exclamations)
def do_command(self, event, cmd, from_private):
"""
This is the function called whenever someone sends a public or
private message addressed to the bot. (e.g. "bot: blah").
"""
if event.eventtype() == "pubmsg":
target = None
else:
target = from_private.strip()
try:
if cmd == 'help':
self.reply(self.helptext, target)
elif cmd == 'lol':
self.reply(self.ponder(), target)
elif cmd == 'urls' or cmd == 'list':
db = self.get_session()
for url in db.query(Url).order_by(Url.timestamp):
line = "%s %s" % (url.url, url.title)
self.reply(line, target)
time.sleep(1)
elif cmd.startswith('http:') or cmd.startswith('https:'):
title = self.save_url(from_private, cmd)
if title == '':
self.reply('URL added.', target)
if title != '':
self.reply('URL added: %s' % title, target)
else:
self.reply(self.exclaim(), target)
except Exception, ex:
print "Exception caught processing command: %s" % ex
print " command was '%s' from %s" % (cmd, target)
def usage():
print """Run a lolbot.
-h, --help
This message.
-c, --config=<filename>
Specify a configuration file. Defaults to lolbot.conf in the same
directory as the script.
Configuration:
irc.server = <host>
The IRC server, e.g. irc.freenode.net
irc.port = <port>
The IRC server port. Default: 6667
irc.channel = <channel>
The chat channel to join, e.g. #trainspotting
irc.nickname = <nickname>
The nickname for your lolbot. Default: "lolbot"
db.url = <url>
The URL to your lolbot database. This can be any valid URL accepted
by SQL Alchemy. If not specified, lolbot will attempt to use a
SQLite database called lolbot.db in the same directory as the
script.
"""
if __name__ == "__main__":
try:
(options, args) = getopt.getopt(sys.argv[1:], "hc:", ["help", "config=", ])
except getopt.GetoptError, err:
print str(err)
usage()
sys.exit(2)
config_path = ""
for option,value in options:
if option in ("-h", "--help"):
usage()
sys.exit(2)
if option in ("-c", "--config"):
config_path = value
try:
LolBot(config_path).start()
except KeyboardInterrupt:
print "Shutting down."