430 lines
14 KiB
Python
430 lines
14 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# LolBot
|
|
#
|
|
# New version based on Twisted IRC.
|
|
|
|
"""
|
|
Useful bot for folks stuck behind censor walls at work
|
|
Logs a channel and collects URLs for later.
|
|
"""
|
|
|
|
try:
|
|
import sys
|
|
import os
|
|
import string
|
|
import random
|
|
import time
|
|
import getopt
|
|
import sqlite3
|
|
from twisted.words.protocols import irc
|
|
from twisted.internet import protocol
|
|
from twisted.internet import reactor
|
|
from datetime import datetime
|
|
from mechanize import Browser
|
|
from sqlalchemy import MetaData, Table, Column, String, Text, Integer, DateTime, create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
except ImportError:
|
|
print "Some modules missing: Lolbot relies on Twisted IRC, Mechanize and SQLAlchemy.\n"
|
|
sys.exit
|
|
|
|
# Exclamations - wrong input
|
|
exclamations = [
|
|
"Zing!",
|
|
"Burns!",
|
|
"Tard!",
|
|
"Lol.",
|
|
"Crazy!",
|
|
"WTF?",
|
|
]
|
|
|
|
# Ponderings
|
|
ponderings = [
|
|
"Hi, can I have a medium lamb roast, with just potatoes.",
|
|
"Can I slurp on your Big Cock?",
|
|
"Quentin Tarantino is so awesome I want to have his babies.",
|
|
"No it's a week night 8pm is past my bedtime.",
|
|
]
|
|
|
|
SqlBase = declarative_base()
|
|
|
|
class Log(SqlBase):
|
|
"""
|
|
This class represents an event in the log table and inherits from a SQLAlchemy
|
|
convenience ORM class.
|
|
"""
|
|
|
|
__tablename__ = "log"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
timestamp = Column(DateTime)
|
|
nickname = Column(String(20))
|
|
text = Column(Text)
|
|
|
|
def __init__(self, nickname, text, timestamp=None):
|
|
"""
|
|
Creates an event log for the IRC logger.
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = datetime.now()
|
|
self.timestamp = timestamp
|
|
self.nickname = nickname
|
|
self.text = text
|
|
|
|
def __repr__(self):
|
|
return "(%s) %s: %s" % (self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), self.nickname, self.text)
|
|
|
|
class Url(SqlBase):
|
|
"""
|
|
This class represents a saved URL and inherits from a SQLAlchemy convenience
|
|
ORM class.
|
|
"""
|
|
|
|
__tablename__ = "url"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
timestamp = Column(DateTime)
|
|
nickname = Column(String(20))
|
|
url = Column(String(200), unique=True)
|
|
title = Column(Text)
|
|
|
|
def __init__(self, nickname, url, title=None, timestamp=None):
|
|
if timestamp is None:
|
|
timestamp = datetime.now()
|
|
self.timestamp = timestamp
|
|
self.nickname = nickname
|
|
self.url = url
|
|
self.title = title
|
|
|
|
# populate the title from the URL if not given.
|
|
if title is None:
|
|
try:
|
|
br = Browser()
|
|
br.open(self.url)
|
|
self.title = br.title()
|
|
except Exception as ex:
|
|
self.title = ''
|
|
|
|
def __repr__(self):
|
|
if not self.title:
|
|
return "%s: %s" % (self.nickname, self.url)
|
|
else:
|
|
return "%s: %s - %s" % (self.nickname, self.url, self.title)
|
|
|
|
class LolBot(irc.IRCClient):
|
|
"""
|
|
The Lolbot itself.
|
|
"""
|
|
|
|
def _get_connection(self):
|
|
connection = sqlite3.Connection(self.config['db.file'])
|
|
connection.text_factory = str
|
|
return connection
|
|
|
|
def created(self, when):
|
|
# connect to the database
|
|
self.dbengine = create_engine('sqlite+pysqlite://', creator=self._get_connection)
|
|
SqlBase.metadata.bind = self.dbengine
|
|
SqlBase.metadata.create_all()
|
|
self.get_session = sessionmaker(bind=self.dbengine)
|
|
|
|
self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); clear - clears the list; lol - say something funny; <url> - adds the URL to the list; help - this message."
|
|
|
|
def now(self):
|
|
return datetime.today().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
def save_url(self, nickname, url):
|
|
title = False
|
|
try:
|
|
db = self.get_session()
|
|
if not db.query(Url).filter(Url.url == url).count():
|
|
theurl = Url(nickname, url)
|
|
db.add(theurl)
|
|
db.commit()
|
|
else:
|
|
theurl = db.query(Url).filter(Url.url == url).one()
|
|
print theurl
|
|
title = theurl.title
|
|
except Exception, ex:
|
|
print "Exception caught saving URL: %s" % ex
|
|
return title
|
|
|
|
def log_event(self, nick, text):
|
|
try:
|
|
entry = Log(nick, text)
|
|
db = self.get_session()
|
|
db.add(entry)
|
|
db.commit()
|
|
print entry
|
|
except Exception, ex:
|
|
print "Exception caught logging event: %s" % ex
|
|
|
|
def _get_nickname(self):
|
|
return self.factory.nickname
|
|
nickname = property(_get_nickname)
|
|
|
|
def _get_channel(self):
|
|
return self.factory.channel
|
|
channel = property(_get_channel)
|
|
|
|
def _get_config(self):
|
|
return self.factory.config
|
|
config = property(_get_config)
|
|
|
|
def signedOn(self):
|
|
self.join(self.channel)
|
|
print "Signed on as %s." % (self.nickname,)
|
|
|
|
def joined(self, channel):
|
|
print "Joined %s." % (channel,)
|
|
|
|
def privmsg(self, user, channel, msg):
|
|
if channel != self.channel:
|
|
# Private /msg from a user
|
|
self.do_command(msg, user)
|
|
else:
|
|
# log it
|
|
self.log_event(user, msg)
|
|
|
|
args = string.split(msg, ":", 1)
|
|
if len(args) > 1 and args[0] == self.nickname:
|
|
self.do_command(string.strip(args[1]))
|
|
else:
|
|
# parse it for links, add URLs to the list
|
|
words = msg.split(" ")
|
|
for w in words:
|
|
if w.startswith('http://') or w.startswith('https://'):
|
|
title = self.save_url(user, w)
|
|
if title == False:
|
|
self.say_public("Sorry, I'm useless at UTF-8.")
|
|
else:
|
|
self.say_public("URL added. %s" % title)
|
|
|
|
def say_public(self, text):
|
|
"Print TEXT into public channel, for all to see."
|
|
self.msg(self.channel, text)
|
|
self.log_event(self.nickname, text)
|
|
|
|
def say_private(self, nick, text):
|
|
"Send private message of TEXT to NICK."
|
|
self.msg(nick, text)
|
|
|
|
def reply(self, text, to_private=None):
|
|
"Send TEXT to either public channel or TO_PRIVATE nick (if defined)."
|
|
if to_private is not None:
|
|
self.say_private(to_private, text)
|
|
else:
|
|
self.say_public(text)
|
|
|
|
def ponder(self):
|
|
"Return a random pondering."
|
|
return random.choice(ponderings)
|
|
|
|
def exclaim(self):
|
|
"Return a random exclamation string."
|
|
return random.choice(exclamations)
|
|
|
|
def do_command(self, cmd, target=None):
|
|
"""
|
|
This is the function called whenever someone sends a public or
|
|
private message addressed to the bot. (e.g. "bot: blah").
|
|
"""
|
|
|
|
target = target.strip()
|
|
|
|
try:
|
|
if cmd == 'help':
|
|
self.reply(self.helptext, target)
|
|
|
|
elif cmd == 'lol':
|
|
self.reply(self.ponder(), target)
|
|
|
|
elif cmd == 'urls' or cmd == 'list':
|
|
db = self.get_session()
|
|
for url in db.query(Url).order_by(Url.timestamp.desc())[:10]:
|
|
line = "%s %s" % (url.url, url.title)
|
|
self.reply(line, target)
|
|
time.sleep(1)
|
|
|
|
elif cmd.startswith('urls ') or cmd.startswith('list '):
|
|
db = self.get_session()
|
|
(listcmd, n) = cmd.split(" ", 1)
|
|
n = n.strip()
|
|
if n == "all":
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())
|
|
elif n.find("-") > 0:
|
|
(x, y) = n.split("-", 1)
|
|
try:
|
|
x = abs(int(x))
|
|
y = abs(int(y))
|
|
if y < x:
|
|
x, y = y, x
|
|
except ValueError, ex:
|
|
self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target)
|
|
raise ex
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())[x-1:y]
|
|
else:
|
|
try:
|
|
n = abs(int(n))
|
|
except ValueError, ex:
|
|
self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target)
|
|
raise ex
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())[:n]
|
|
|
|
for url in rows:
|
|
line = "%s %s" % (url.url, url.title)
|
|
self.reply(line, target)
|
|
time.sleep(1)
|
|
|
|
elif cmd.startswith('http:') or cmd.startswith('https:'):
|
|
title = self.save_url(from_private, cmd)
|
|
if title == False:
|
|
self.say_public("Sorry, I'm useless at UTF-8.")
|
|
else:
|
|
self.reply('URL added. %s' % title, target)
|
|
|
|
else:
|
|
self.reply(self.exclaim(), target)
|
|
|
|
except Exception, ex:
|
|
print "Exception caught processing command: %s" % ex
|
|
print " command was '%s' from %s" % (cmd, target)
|
|
self.reply("Sorry, I didn't understand: %s" % cmd, target)
|
|
self.reply(self.helptext, target)
|
|
|
|
class LolBotFactory(protocol.ClientFactory):
|
|
protocol = LolBot
|
|
|
|
def __init__(self, config_path):
|
|
self.config = get_config(config_path)
|
|
self.server = self.config['irc.server']
|
|
self.port = self.config['irc.port']
|
|
self.nickname = self.config['irc.nickname']
|
|
self.channel = self.config['irc.channel']
|
|
|
|
def clientConnectionLost(self, connector, reason):
|
|
print "Lost connection (%s), reconnecting." % (reason,)
|
|
connector.connect()
|
|
|
|
def clientConnectionFailed(self, connector, reason):
|
|
print "Could not connect: %s" % (reason,)
|
|
|
|
def get_options():
|
|
try:
|
|
(options, args) = getopt.getopt(sys.argv[1:], "hc:", ["help", "config=", ])
|
|
except getopt.GetoptError, err:
|
|
print str(err)
|
|
usage()
|
|
sys.exit(2)
|
|
|
|
config_path = ""
|
|
for option,value in options:
|
|
if option in ("-h", "--help"):
|
|
usage()
|
|
sys.exit(2)
|
|
if option in ("-c", "--config"):
|
|
config_path = value
|
|
return { 'config_path': config_path }
|
|
|
|
def get_config(config_path):
|
|
"""
|
|
This method loads configuration options from a lolbot.conf file. The file
|
|
should look like this:
|
|
|
|
irc.server = irc.freenode.net
|
|
irc.port = 6667
|
|
irc.channel = #lolbottest
|
|
irc.nickname = lolbot
|
|
db.url = sqlite:///lolbot.db
|
|
|
|
"""
|
|
|
|
if not config_path:
|
|
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lolbot.conf")
|
|
if not os.path.exists(config_path):
|
|
print "Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option."
|
|
usage()
|
|
sys.exit(1)
|
|
|
|
# open the configuration file and grab all param=value declarations.
|
|
config = {}
|
|
with open(config_path) as f:
|
|
for line in f:
|
|
# skip comments
|
|
if line.strip().startswith("#") or line.strip() == "":
|
|
continue
|
|
|
|
# collect up param = value
|
|
try:
|
|
(param, value) = line.strip().split("=", 1)
|
|
if param.strip() != "":
|
|
config[param.strip()] = value.strip()
|
|
except ValueError:
|
|
continue
|
|
|
|
# validate IRC host
|
|
if "irc.server" not in config.keys():
|
|
print "Error: the IRC server was not specified. Use --help for more information."
|
|
sys.exit(1)
|
|
|
|
# validate IRC port
|
|
if "irc.port" not in config.keys():
|
|
config["irc.port"] = "6667"
|
|
try:
|
|
config["irc.port"] = int(config["irc.port"])
|
|
except ValueError:
|
|
print "Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information."
|
|
sys.exit(1)
|
|
|
|
# validate IRC channel
|
|
if "irc.channel" not in config.keys() or not config["irc.channel"].startswith("#"):
|
|
print "Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information."
|
|
sys.exit(1)
|
|
|
|
# validate bot nickname
|
|
if "irc.nickname" not in config.keys():
|
|
config["irc.nickname"] = "lolbot"
|
|
|
|
return config
|
|
|
|
def usage():
|
|
print """Run a lolbot.
|
|
|
|
-h, --help
|
|
This message.
|
|
|
|
-c, --config=<filename>
|
|
Specify a configuration file. Defaults to lolbot.conf in the same
|
|
directory as the script.
|
|
|
|
Configuration:
|
|
|
|
irc.server = <host>
|
|
The IRC server, e.g. irc.freenode.net
|
|
|
|
irc.port = <port>
|
|
The IRC server port. Default: 6667
|
|
|
|
irc.channel = <channel>
|
|
The chat channel to join, e.g. #trainspotting
|
|
|
|
irc.nickname = <nickname>
|
|
The nickname for your lolbot. Default: "lolbot"
|
|
|
|
db.url = <url>
|
|
The URL to your lolbot database. This can be any valid URL accepted
|
|
by SQL Alchemy. If not specified, lolbot will attempt to use a
|
|
SQLite database called lolbot.db in the same directory as the
|
|
script.
|
|
"""
|
|
|
|
if __name__ == "__main__":
|
|
args = get_options()
|
|
config = get_config(args['config_path'])
|
|
try:
|
|
reactor.connectTCP(config['irc.server'], config['irc.port'], LolBotFactory(args['config_path']))
|
|
reactor.run()
|
|
except KeyboardInterrupt:
|
|
print "Shutting down."
|
|
|