lolbot/lolbot.py

428 lines
14 KiB
Python
Executable file

#!/usr/bin/env python
#
# LolBot
#
# New version based on Twisted IRC.
"""
Useful bot for folks stuck behind censor walls at work
Logs a channel and collects URLs for later.
"""
try:
import sys
import os
import string
import re
import random
import time
import getopt
import sqlite3
from twisted.words.protocols import irc
from twisted.internet import protocol
from twisted.internet import reactor
from datetime import datetime
from mechanize import Browser
from sqlalchemy import MetaData, Table, Column, String, Text, Integer, DateTime, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
except ImportError:
print "Some modules missing: Lolbot relies on Twisted IRC, Mechanize and SQLAlchemy.\n"
sys.exit
SqlBase = declarative_base()
class Log(SqlBase):
"""
This class represents an event in the log table and inherits from a SQLAlchemy
convenience ORM class.
"""
__tablename__ = "log"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime)
nickname = Column(String(20))
text = Column(Text)
def __init__(self, nickname, text, timestamp=None):
"""
Creates an event log for the IRC logger.
"""
if timestamp is None:
timestamp = datetime.now()
self.timestamp = timestamp
self.nickname = nickname
self.text = text
def __repr__(self):
return "(%s) %s: %s" % (self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), self.nickname, self.text)
class Url(SqlBase):
"""
This class represents a saved URL and inherits from a SQLAlchemy convenience
ORM class.
"""
__tablename__ = "url"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime)
nickname = Column(String(20))
url = Column(String(200), unique=True)
title = Column(Text)
def __init__(self, nickname, url, title=None, timestamp=None):
if timestamp is None:
timestamp = datetime.now()
self.timestamp = timestamp
self.nickname = nickname
self.url = url
self.title = title
# populate the title from the URL if not given.
if title is None:
try:
br = Browser()
br.open(self.url)
self.title = br.title()
except Exception as ex:
self.title = ''
def __repr__(self):
if not self.title:
return "%s: %s" % (self.nickname, self.url)
else:
return "%s: %s - %s" % (self.nickname, self.url, self.title)
class LolBot(irc.IRCClient):
"""
The Lolbot itself.
"""
def _get_connection(self):
connection = sqlite3.Connection(self.config['db.file'])
connection.text_factory = str
return connection
def created(self, when):
# connect to the database
self.dbengine = create_engine('sqlite+pysqlite://', creator=self._get_connection)
SqlBase.metadata.bind = self.dbengine
SqlBase.metadata.create_all()
self.get_session = sessionmaker(bind=self.dbengine)
self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); clear - clears the list; <url> - adds the URL to the list; help - this message."
def now(self):
return datetime.today().strftime("%Y-%m-%d %H:%M:%S")
def save_url(self, nickname, url):
title = False
try:
db = self.get_session()
if not db.query(Url).filter(Url.url == url).count():
theurl = Url(nickname, url)
db.add(theurl)
db.commit()
else:
theurl = db.query(Url).filter(Url.url == url).one()
print theurl
title = theurl.title
except Exception, ex:
print "Exception caught saving URL: %s" % ex
return title
def log_event(self, nick, text):
try:
entry = Log(nick, text)
db = self.get_session()
db.add(entry)
db.commit()
print entry
except Exception, ex:
print "Exception caught logging event: %s" % ex
def _get_nickname(self):
return self.factory.nickname
nickname = property(_get_nickname)
def _get_channel(self):
return self.factory.channel
channel = property(_get_channel)
def _get_config(self):
return self.factory.config
config = property(_get_config)
def signedOn(self):
self.join(self.channel)
print "Signed on as %s." % (self.nickname,)
def joined(self, channel):
print "Joined %s." % (channel,)
def privmsg(self, user, channel, msg):
user = user.split('!')[0]
if channel == self.nickname:
# Private /msg from a user
self.do_command(msg, user)
else:
# log it
self.log_event(user, msg)
args = string.split(msg, ":", 1)
if len(args) > 1 and args[0] == self.nickname:
self.do_command(string.strip(args[1]))
else:
# parse it for links, add URLs to the list
words = msg.split(" ")
for w in words:
# URL
if w.startswith('http://') or w.startswith('https://'):
title = self.save_url(user, w)
if title != False:
self.say_public("URL added. %s" % title)
# Moodle tracker bugs
elif w.startswith('MDL-'):
m = re.search('^MDL-([0-9]+)$', w)
if m:
trackerurl = 'https://tracker.moodle.org/browse/%s' % w
title = self.save_url(user, trackerurl)
if title != False:
self.say_public("%s - %s" % (trackerurl, title))
# Work requests
elif w.startswith('WR'):
m = re.search('^WR#?([0-9]+)$', w)
if m:
wrmsurl = 'https://wrms.catalyst.net.nz/%s' % m.groups()[0]
title = self.save_url(user, wrmsurl)
if title != False:
self.say_public("%s - %s" % (wrmsurl, title))
# Bugzilla
elif w.startswith('BUG'):
m = re.search('^BUG#?([0-9]+)$', w)
if m:
wrmsurl = 'http://bugzilla.catalyst.net.nz/show_bug.cgi?id=%s' % m.groups()[0]
title = self.save_url(user, wrmsurl)
if title != False:
self.say_public("%s - %s" % (wrmsurl, title))
def say_public(self, text):
"Print TEXT into public channel, for all to see."
self.notice(self.channel, text)
self.log_event(self.nickname, text)
def say_private(self, nick, text):
"Send private message of TEXT to NICK."
self.notice(nick, text)
def reply(self, text, to_private=None):
"Send TEXT to either public channel or TO_PRIVATE nick (if defined)."
if to_private is not None:
self.say_private(to_private, text)
else:
self.say_public(text)
def do_command(self, cmd, target=None):
"""
This is the function called whenever someone sends a public or
private message addressed to the bot. (e.g. "bot: blah").
"""
try:
if cmd == 'help':
self.reply(self.helptext, target)
elif cmd == 'urls' or cmd == 'list':
db = self.get_session()
for url in db.query(Url).order_by(Url.timestamp.desc())[:10]:
line = "%s %s" % (url.url, url.title)
self.reply(line, target)
time.sleep(1)
elif cmd.startswith('urls ') or cmd.startswith('list '):
db = self.get_session()
(listcmd, n) = cmd.split(" ", 1)
n = n.strip()
if n == "all":
rows = db.query(Url).order_by(Url.timestamp.desc())
elif n.find("-") > 0:
(x, y) = n.split("-", 1)
try:
x = abs(int(x))
y = abs(int(y))
if y < x:
x, y = y, x
except ValueError, ex:
self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target)
raise ex
rows = db.query(Url).order_by(Url.timestamp.desc())[x-1:y]
else:
try:
n = abs(int(n))
except ValueError, ex:
self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target)
raise ex
rows = db.query(Url).order_by(Url.timestamp.desc())[:n]
for url in rows:
line = "%s %s" % (url.url, url.title)
self.reply(line, target)
time.sleep(1)
elif cmd.startswith('http:') or cmd.startswith('https:'):
title = self.save_url(from_private, cmd)
if title == False:
self.say_public("Sorry, I'm useless at UTF-8.")
else:
self.reply('URL added. %s' % title, target)
else:
self.reply("I'm a bot of little brain. Try help for more information.", target)
except Exception, ex:
print "Exception caught processing command: %s" % ex
print " command was '%s' from %s" % (cmd, target)
self.reply("Sorry, I didn't understand: %s" % cmd, target)
self.reply(self.helptext, target)
class LolBotFactory(protocol.ClientFactory):
protocol = LolBot
def __init__(self, config_path):
self.config = get_config(config_path)
self.server = self.config['irc.server']
self.port = self.config['irc.port']
self.nickname = self.config['irc.nickname']
self.channel = self.config['irc.channel']
def clientConnectionLost(self, connector, reason):
print "Lost connection (%s), reconnecting." % (reason,)
connector.connect()
def clientConnectionFailed(self, connector, reason):
print "Could not connect: %s" % (reason,)
def get_options():
try:
(options, args) = getopt.getopt(sys.argv[1:], "hc:", ["help", "config=", ])
except getopt.GetoptError, err:
print str(err)
usage()
sys.exit(2)
config_path = ""
for option,value in options:
if option in ("-h", "--help"):
usage()
sys.exit(2)
if option in ("-c", "--config"):
config_path = value
return { 'config_path': config_path }
def get_config(config_path):
"""
This method loads configuration options from a lolbot.conf file. The file
should look like this:
irc.server = irc.freenode.net
irc.port = 6667
irc.channel = #lolbottest
irc.nickname = lolbot
db.url = sqlite:///lolbot.db
"""
if not config_path:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lolbot.conf")
if not os.path.exists(config_path):
print "Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option."
usage()
sys.exit(1)
# open the configuration file and grab all param=value declarations.
config = {}
with open(config_path) as f:
for line in f:
# skip comments
if line.strip().startswith("#") or line.strip() == "":
continue
# collect up param = value
try:
(param, value) = line.strip().split("=", 1)
if param.strip() != "":
config[param.strip()] = value.strip()
except ValueError:
continue
# validate IRC host
if "irc.server" not in config.keys():
print "Error: the IRC server was not specified. Use --help for more information."
sys.exit(1)
# validate IRC port
if "irc.port" not in config.keys():
config["irc.port"] = "6667"
try:
config["irc.port"] = int(config["irc.port"])
except ValueError:
print "Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information."
sys.exit(1)
# validate IRC channel
if "irc.channel" not in config.keys() or not config["irc.channel"].startswith("#"):
print "Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information."
sys.exit(1)
# validate bot nickname
if "irc.nickname" not in config.keys():
config["irc.nickname"] = "lolbot"
return config
def usage():
print """Run a lolbot.
-h, --help
This message.
-c, --config=<filename>
Specify a configuration file. Defaults to lolbot.conf in the same
directory as the script.
Configuration:
irc.server = <host>
The IRC server, e.g. irc.freenode.net
irc.port = <port>
The IRC server port. Default: 6667
irc.channel = <channel>
The chat channel to join, e.g. #trainspotting
irc.nickname = <nickname>
The nickname for your lolbot. Default: "lolbot"
db.url = <url>
The URL to your lolbot database. This can be any valid URL accepted
by SQL Alchemy. If not specified, lolbot will attempt to use a
SQLite database called lolbot.db in the same directory as the
script.
"""
if __name__ == "__main__":
args = get_options()
config = get_config(args['config_path'])
try:
reactor.connectTCP(config['irc.server'], config['irc.port'], LolBotFactory(args['config_path']))
reactor.run()
except KeyboardInterrupt:
print "Shutting down."