358 lines
10 KiB
Python
Executable file
358 lines
10 KiB
Python
Executable file
#!/usr/bin/env python
|
|
#
|
|
# LolBot
|
|
#
|
|
# Code originally based on example bot and irc-bot class from
|
|
# Joel Rosdahl <joel@rosdahl.net>, author of included python-irclib.
|
|
|
|
"""
|
|
Useful bot for folks stuck behind censor walls at work
|
|
Logs a channel and collects URLs for later.
|
|
"""
|
|
|
|
import sys, string, random, time
|
|
from ircbot import SingleServerIRCBot, OutputManager
|
|
from irclib import nm_to_n, nm_to_h, irc_lower
|
|
import os
|
|
|
|
from datetime import datetime
|
|
from mechanize import Browser
|
|
|
|
import getopt
|
|
from sqlalchemy import MetaData, Table, Column, String, Text, Integer, DateTime, engine_from_config
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
|
|
# Exclamations - wrong input
|
|
exclamations = [
|
|
"Zing!",
|
|
"Burns!",
|
|
"Tard!",
|
|
"Lol.",
|
|
"Crazy!",
|
|
"WTF?",
|
|
]
|
|
|
|
# Ponderings
|
|
ponderings = [
|
|
"Hi, can I have a medium lamb roast, with just potatoes.",
|
|
"Can I slurp on your Big Cock?",
|
|
"Your Mum likes it two in the pink one in the stink.",
|
|
"Quentin Tarantino is so awesome I want to have his babies.",
|
|
"No it's a week night 8pm is past my bedtime.",
|
|
]
|
|
|
|
SqlBase = declarative_base()
|
|
|
|
class Log(SqlBase):
|
|
"""
|
|
This class represents an event in the log table and inherits from a SQLAlchemy
|
|
convenience ORM class.
|
|
"""
|
|
|
|
__tablename__ = "log"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
timestamp = Column(DateTime)
|
|
nickname = Column(String(20))
|
|
text = Column(Text)
|
|
|
|
def __init__(self, nickname, text, timestamp=None):
|
|
if timestamp is None:
|
|
timestamp = datetime.now()
|
|
self.timestamp = timestamp
|
|
self.nickname = nickname
|
|
self.text = text
|
|
|
|
def __repr__(self):
|
|
return "(%s) %s: %s" % (self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), self.nickname, self.text)
|
|
|
|
class Url(SqlBase):
|
|
"""
|
|
This class represents a saved URL and inherits from a SQLAlchemy convenience
|
|
ORM class.
|
|
"""
|
|
|
|
__tablename__ = "url"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
timestamp = Column(DateTime)
|
|
nickname = Column(String(20))
|
|
url = Column(String(200), unique=True)
|
|
title = Column(Text)
|
|
|
|
def __init__(self, nickname, url, title=None, timestamp=None):
|
|
if timestamp is None:
|
|
timestamp = datetime.now()
|
|
self.timestamp = timestamp
|
|
self.nickname = nickname
|
|
self.url = url
|
|
self.title = title
|
|
|
|
# populate the title from the URL if not given.
|
|
if title is None:
|
|
try:
|
|
br = Browser()
|
|
br.open(self.url)
|
|
self.title = br.title()
|
|
except Exception as ex:
|
|
self.title = ''
|
|
|
|
def __repr__(self):
|
|
if not self.title:
|
|
return "%s: %s" % (self.nickname, self.url)
|
|
else:
|
|
return "%s: %s - %s" % (self.nickname, self.url, self.title)
|
|
|
|
class LolBot(SingleServerIRCBot):
|
|
"""
|
|
The Lolbot itself.
|
|
"""
|
|
|
|
def __init__(self, config_path=''):
|
|
self.get_config(config_path)
|
|
SingleServerIRCBot.__init__(self, [(self.server, self.port)], self.nickname, self.nickname)
|
|
|
|
# connect to the database
|
|
self.dbengine = engine_from_config(self.config, prefix="db.")
|
|
SqlBase.metadata.bind = self.dbengine
|
|
SqlBase.metadata.create_all()
|
|
self.get_session = sessionmaker(bind=self.dbengine)
|
|
|
|
self.helptext = "Adds URLs to a list. Commands: list - prints a bunch of URLs; clear - clears the list; lol - say something funny; <url> - adds the URL to the list; help - this message."
|
|
|
|
self.queue = OutputManager(self.connection)
|
|
self.queue.start()
|
|
self.start()
|
|
|
|
def get_config(self, config_path):
|
|
"""
|
|
This method loads configuration options from a lolbot.conf file. The file
|
|
should look like this:
|
|
|
|
irc.server = irc.freenode.net
|
|
irc.port = 6667
|
|
irc.channel = #lolbottest
|
|
irc.nickname = lolbot
|
|
db.url = sqlite:///lolbot.db
|
|
|
|
"""
|
|
|
|
if not config_path:
|
|
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lolbot.conf")
|
|
if not os.path.exists(config_path):
|
|
print "Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option."
|
|
usage()
|
|
sys.exit(1)
|
|
|
|
# open the configuration file and grab all param=value declarations.
|
|
config = {}
|
|
with open(config_path) as f:
|
|
for line in f:
|
|
# skip comments
|
|
if line.strip().startswith("#") or line.strip() == "":
|
|
continue
|
|
|
|
# collect up param = value
|
|
try:
|
|
(param, value) = line.strip().split("=", 1)
|
|
if param.strip() != "":
|
|
config[param.strip()] = value.strip()
|
|
except ValueError:
|
|
continue
|
|
|
|
# validate IRC host
|
|
if "irc.server" not in config.keys():
|
|
print "Error: the IRC server was not specified. Use --help for more information."
|
|
sys.exit(1)
|
|
self.server = config["irc.server"]
|
|
|
|
# validate IRC port
|
|
if "irc.port" not in config.keys():
|
|
config["irc.port"] = "6667"
|
|
try:
|
|
self.port = int(config["irc.port"])
|
|
except ValueError:
|
|
print "Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information."
|
|
sys.exit(1)
|
|
|
|
# validate IRC channel
|
|
if "irc.channel" not in config.keys() or not config["irc.channel"].startswith("#"):
|
|
print "Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information."
|
|
sys.exit(1)
|
|
self.channel = config["irc.channel"]
|
|
|
|
# validate bot nickname
|
|
if "irc.nickname" not in config.keys():
|
|
config["irc.nickname"] = "lolbot"
|
|
self.nickname = config["irc.nickname"]
|
|
|
|
self.config = config
|
|
|
|
def now(self):
|
|
return datetime.today().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
def save_url(self, nickname, url):
|
|
db = self.get_session()
|
|
if not db.query(Url).filter(Url.url == url).count():
|
|
theurl = Url(nickname, url)
|
|
db.add(theurl)
|
|
db.commit()
|
|
else:
|
|
try:
|
|
theurl = db.query(Url).filter(Url.url == url).one()
|
|
except MultipleResultsFound, ex:
|
|
print ex #wtf
|
|
except NoResultsFound, ex:
|
|
print ex #wtf
|
|
print theurl
|
|
return theurl.title
|
|
|
|
def log_event(self, nick, text):
|
|
entry = Log(nick, text)
|
|
db = self.get_session()
|
|
db.add(entry)
|
|
db.commit()
|
|
print entry
|
|
|
|
def on_nicknameinuse(self, connection, event):
|
|
self.nickname = connection.get_nickname() + "_"
|
|
connection.nick(self.nickname)
|
|
|
|
def on_welcome(self, connection, event):
|
|
connection.join(self.channel)
|
|
|
|
def on_privmsg(self, connection, event):
|
|
"Deal with a /msg private message."
|
|
from_nick = nm_to_n(event.source())
|
|
self.do_command(event, event.arguments()[0], from_nick)
|
|
|
|
def on_pubmsg(self, connection, event):
|
|
"Deal with a public message in a channel."
|
|
|
|
# log it
|
|
from_nick = nm_to_n(event.source())
|
|
self.log_event(from_nick, event.arguments()[0])
|
|
|
|
args = string.split(event.arguments()[0], ":", 1)
|
|
if len(args) > 1 and irc_lower(args[0]) == irc_lower(self.nickname):
|
|
self.do_command(event, string.strip(args[1]), from_nick)
|
|
else:
|
|
# parse it for links, add URLs to the list
|
|
words = event.arguments()[0].split(" ")
|
|
for w in words:
|
|
if w.startswith('http://') or w.startswith('https://'):
|
|
title = self.save_url(from_nick, w)
|
|
self.say_public(title)
|
|
|
|
def say_public(self, text):
|
|
"Print TEXT into public channel, for all to see."
|
|
self.queue.send(text, self.channel)
|
|
self.log_event(self.nickname, text)
|
|
|
|
def say_private(self, nick, text):
|
|
"Send private message of TEXT to NICK."
|
|
self.queue.send(text, nick)
|
|
|
|
def reply(self, text, to_private=None):
|
|
"Send TEXT to either public channel or TO_PRIVATE nick (if defined)."
|
|
if to_private is not None:
|
|
self.say_private(to_private, text)
|
|
else:
|
|
self.say_public(text)
|
|
|
|
def ponder(self):
|
|
"Return a random pondering."
|
|
return random.choice(ponderings)
|
|
|
|
def exclaim(self):
|
|
"Return a random exclamation string."
|
|
return random.choice(exclamations)
|
|
|
|
def do_command(self, event, cmd, from_private):
|
|
"""
|
|
This is the function called whenever someone sends a public or
|
|
private message addressed to the bot. (e.g. "bot: blah").
|
|
"""
|
|
|
|
if event.eventtype() == "pubmsg":
|
|
target = None
|
|
else:
|
|
target = from_private.strip()
|
|
|
|
if cmd == 'help':
|
|
self.reply(self.helptext, target)
|
|
|
|
elif cmd == 'lol':
|
|
self.reply(self.ponder(), target)
|
|
|
|
elif cmd == 'urls' or cmd == 'list':
|
|
db = self.get_session()
|
|
for url in db.query(Url).order_by(Url.timestamp):
|
|
line = "%s %s" % (url.url, url.title)
|
|
self.reply(line, target)
|
|
time.sleep(1)
|
|
|
|
elif cmd.startswith('http:') or cmd.startswith('https:'):
|
|
title = self.save_url(from_private, cmd)
|
|
if title == '':
|
|
self.reply('URL added.', target)
|
|
if title != '':
|
|
self.reply('URL added: %s' % title, target)
|
|
|
|
else:
|
|
self.reply(self.exclaim(), target)
|
|
|
|
|
|
def usage():
|
|
print """Run a lolbot.
|
|
|
|
-h, --help
|
|
This message.
|
|
|
|
-c, --config=<filename>
|
|
Specify a configuration file. Defaults to lolbot.conf in the same
|
|
directory as the script.
|
|
|
|
Configuration:
|
|
|
|
irc.server = <host>
|
|
The IRC server, e.g. irc.freenode.net
|
|
|
|
irc.port = <port>
|
|
The IRC server port. Default: 6667
|
|
|
|
irc.channel = <channel>
|
|
The chat channel to join, e.g. #trainspotting
|
|
|
|
irc.nickname = <nickname>
|
|
The nickname for your lolbot. Default: "lolbot"
|
|
|
|
db.url = <url>
|
|
The URL to your lolbot database. This can be any valid URL accepted
|
|
by SQL Alchemy. If not specified, lolbot will attempt to use a
|
|
SQLite database called lolbot.db in the same directory as the
|
|
script.
|
|
"""
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
(options, args) = getopt.getopt(sys.argv[1:], "hc:", ["help", "config=", ])
|
|
except getopt.GetoptError, err:
|
|
print str(err)
|
|
usage()
|
|
sys.exit(2)
|
|
|
|
config_path = ""
|
|
for option,value in options:
|
|
if option in ("-h", "--help"):
|
|
usage()
|
|
sys.exit(2)
|
|
if option in ("-c", "--config"):
|
|
config_path = value
|
|
|
|
try:
|
|
LolBot(config_path).start()
|
|
except KeyboardInterrupt:
|
|
print "Shutting down."
|
|
|