526 lines
17 KiB
Python
526 lines
17 KiB
Python
#! /usr/bin/env python
|
|
"""
|
|
LOLBOT 2
|
|
|
|
- die: Let the bot cease to exist.
|
|
- ask: Ask a MoxQuizz question.
|
|
- list: list some URLs
|
|
"""
|
|
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import sys
|
|
import os
|
|
import sqlite3
|
|
import random
|
|
import time
|
|
import getopt
|
|
import irc.strings
|
|
from irc.bot import SingleServerIRCBot
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from models import Log, Url, Model
|
|
from pymoxquizz import QuestionBank, Question
|
|
from os import listdir, path
|
|
|
|
|
|
DEBUG = True
|
|
|
|
|
|
def debug(msg):
|
|
if DEBUG:
|
|
print(msg)
|
|
|
|
|
|
class LolBot(SingleServerIRCBot):
|
|
"""
|
|
An IRC bot to entertain the troops with MoxQuizz questions, log URLs, and
|
|
other shenanigans.
|
|
"""
|
|
|
|
qb = list()
|
|
|
|
def __init__(self, config=None):
|
|
"""
|
|
Constructor. Instantiates a lolbot with a configuration dictionary,
|
|
or from command-line options if none is specified.
|
|
"""
|
|
|
|
if not config:
|
|
config = LolBot.get_options()
|
|
|
|
if not self.validate_config(config):
|
|
sys.exit(1)
|
|
|
|
self.config = config
|
|
(server, port, channel, nickname, database) = (
|
|
config['irc.server'],
|
|
config['irc.port'],
|
|
config['irc.channel'],
|
|
config['irc.nickname'],
|
|
config['db.file'])
|
|
|
|
debug("Instantiating SingleServerIRCBot")
|
|
irc.client.ServerConnection.buffer_class = irc.buffer.LenientDecodingLineBuffer
|
|
SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
|
|
self.channel = channel
|
|
|
|
# load some MoxQuizz questions
|
|
qfiles = [f for f in listdir('questions') if path.isfile(path.join('questions', f))]
|
|
debug("Loading MoxQuizz questions")
|
|
for f in qfiles:
|
|
qfile = path.abspath(path.join('questions', f))
|
|
debug(" - from MoxQuizz bank '%s'" % qfile)
|
|
self.qb += QuestionBank(qfile).questions
|
|
random.shuffle(self.qb)
|
|
self.quiz = 0
|
|
self.question = None
|
|
|
|
# connect to the database
|
|
debug("Connecting to SQLite database '%s'" % database)
|
|
self.dbfile = database
|
|
self.dbengine = create_engine('sqlite+pysqlite://', creator=self._get_connection)
|
|
Model.metadata.bind = self.dbengine
|
|
Model.metadata.create_all()
|
|
self.get_db = sessionmaker(bind=self.dbengine)
|
|
|
|
self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); <url> - adds the URL to the list; help - this message."
|
|
debug("Exiting lolbot constructor")
|
|
|
|
def _get_connection(self):
|
|
"""Creator function for SQLAlchemy."""
|
|
connection = sqlite3.Connection(self.dbfile)
|
|
connection.text_factory = str
|
|
debug("Creating SQLAlchemy connection")
|
|
return connection
|
|
|
|
@property
|
|
def nickname(self):
|
|
return self.connection.get_nickname()
|
|
|
|
def say_public(self, text):
|
|
"""
|
|
Say text in the public channel for all to see.
|
|
"""
|
|
self.connection.privmsg(self.channel, text)
|
|
self.log_event(self.nickname, text)
|
|
|
|
def say_private(self, nick, text):
|
|
"""
|
|
Say text in a private message to nick.
|
|
"""
|
|
self.connection.privmsg(nick, text)
|
|
|
|
def reply(self, text, to_private=None):
|
|
"""
|
|
Say text in either public channel or a private message (if to_private
|
|
supplied).
|
|
"""
|
|
if to_private is not None:
|
|
self.say_private(to_private, text)
|
|
else:
|
|
self.say_public(text)
|
|
|
|
def on_nicknameinuse(self, connection, event):
|
|
debug("Nick '%s' in use, trying '%s_'" % (self.nickname, self.nickname))
|
|
connection.nick(self.nickname + "_")
|
|
|
|
def on_welcome(self, connection, event):
|
|
debug("Joining channel '%s' as %s" % (self.channel, self.nickname))
|
|
connection.join(self.channel)
|
|
|
|
def on_privmsg(self, connection, event):
|
|
"""
|
|
Handle a /msg from a user.
|
|
Handle commands addressed to the bot.
|
|
"""
|
|
message = event.arguments[0]
|
|
self.do_command(event, message)
|
|
|
|
def on_pubmsg(self, connection, event):
|
|
"""
|
|
Handle an event on the channel.
|
|
Handle commands addressed to the bot.
|
|
If there's a question, see if it's been answered.
|
|
"""
|
|
|
|
# Handle bot commands if addressed by nick or using ! shortcut.
|
|
try:
|
|
(nick, message) = event.arguments[0].split(": ", 1)
|
|
if irc.strings.lower(nick) == irc.strings.lower(self.connection.get_nickname()):
|
|
self.do_command(event, message.strip())
|
|
except ValueError:
|
|
message = event.arguments[0]
|
|
nick = event.source.nick
|
|
if message.startswith('!'):
|
|
self.do_command(event, message.lstrip('!'))
|
|
|
|
# Log it.
|
|
self.log_event(nick, message)
|
|
|
|
# Deal with MoxQuizz question.
|
|
if self.quiz:
|
|
self.handle_quiz(nick, message)
|
|
|
|
# Record URLs.
|
|
words = message.split(" ")
|
|
for w in words:
|
|
if w.startswith('http://') or w.startswith('https://'):
|
|
title = self.save_url(nick, w)
|
|
if title is False:
|
|
self.say_public("Failed to record URL, or no title found.")
|
|
else:
|
|
self.say_public("URL added. %s" % title)
|
|
|
|
def save_url(self, nickname, url):
|
|
title = False
|
|
try:
|
|
db = self.get_db()
|
|
if not db.query(Url).filter(Url.url == url).count():
|
|
theurl = Url(nickname, url)
|
|
db.add(theurl)
|
|
db.commit()
|
|
else:
|
|
theurl = db.query(Url).filter(Url.url == url).one()
|
|
print(theurl)
|
|
title = theurl.title
|
|
except Exception as ex:
|
|
print("Exception caught saving URL: %s" % ex)
|
|
return title
|
|
|
|
def log_event(self, nick, text):
|
|
try:
|
|
entry = Log(nick, text)
|
|
db = self.get_db()
|
|
db.add(entry)
|
|
db.commit()
|
|
print(entry)
|
|
except Exception as ex:
|
|
print("Exception caught logging event: %s" % ex)
|
|
|
|
def start_quiz(self, nick):
|
|
self.quiz = 0
|
|
self.quiz_scores = dict()
|
|
self.connection.notice(self.channel, 'Quiz begun by %s.' % nick)
|
|
self.quiz_get_next()
|
|
|
|
def stop_quiz(self):
|
|
self.quiz = 0
|
|
self.quiz_scores = None
|
|
self.question = None
|
|
|
|
def quiz_get_next(self):
|
|
self.quiz += 1
|
|
self.tip = 0
|
|
self.question = random.choice(self.qb)
|
|
print(self.question.question)
|
|
self.connection.notice(self.channel, "Question %s: %s" % (self.quiz, self.question.question))
|
|
|
|
def quiz_tip(self):
|
|
if len(self.question.tip) > self.tip:
|
|
self.connection.notice(self.channel, "Tip: %s" % self.question.tip[self.tip])
|
|
self.tip += 1
|
|
else:
|
|
self.connection.notice(self.channel, "No more tips.")
|
|
|
|
def quiz_award_points(self, nick):
|
|
if nick not in self.quiz_scores.keys():
|
|
self.quiz_scores[nick] = 0
|
|
self.quiz_scores[nick] += self.question.score
|
|
|
|
score = "%s point" % self.quiz_scores[nick]
|
|
if self.quiz_scores[nick] != 1:
|
|
score += "s"
|
|
|
|
self.connection.notice(self.channel, 'Correct! The answer was %s.' % self.question.answer)
|
|
time.sleep(1)
|
|
self.connection.notice(self.channel, '%s is on %s.' % (nick, score))
|
|
|
|
def quiz_check_win(self, nick):
|
|
if self.quiz_scores[nick] == 10:
|
|
self.connection.notice(self.channel, '%s wins with 10 points!' % nick)
|
|
self.quiz_scoreboard()
|
|
self.stop_quiz()
|
|
|
|
def quiz_scoreboard(self):
|
|
if not self.quiz:
|
|
self.connection.notice(self.channel, 'Quiz not running.')
|
|
return
|
|
|
|
self.connection.notice(self.channel, 'Scoreboard:')
|
|
for nick in self.quiz_scores.keys():
|
|
score = "%s point" % self.quiz_scores[nick]
|
|
if self.quiz_scores[nick] != 1:
|
|
score += "s"
|
|
self.connection.notice(self.channel, '%s has %s.' % (nick, score))
|
|
if not len(self.quiz_scores):
|
|
self.connection.notice(self.channel, 'So far, nobody has got anything right.')
|
|
|
|
def handle_quiz(self, nick, message):
|
|
# bail if there's no quiz or unanswered question.
|
|
if not self.quiz or not isinstance(self.question, Question):
|
|
return
|
|
|
|
# see if anyone answered correctly.
|
|
if self.question.attempt(message):
|
|
self.quiz_award_points(nick)
|
|
time.sleep(1)
|
|
self.quiz_check_win(nick)
|
|
|
|
# if nobody has won, carry on
|
|
if self.quiz:
|
|
|
|
# scores every 10 questions.
|
|
if self.quiz % 10 == 0:
|
|
self.quiz_scoreboard()
|
|
|
|
time.sleep(1)
|
|
self.quiz_get_next()
|
|
|
|
def do_command(self, e, cmd):
|
|
"""
|
|
Handle bot commands.
|
|
"""
|
|
nick = e.source.nick
|
|
c = self.connection
|
|
|
|
if cmd == "die":
|
|
self.die()
|
|
|
|
elif cmd == 'help':
|
|
c.notice(nick, self.helptext)
|
|
|
|
elif cmd == 'status':
|
|
c.notice(self.channel, "I know %s questions." % len(self.qb))
|
|
if self.quiz:
|
|
c.notice(self.channel, "I am currently running a quiz.")
|
|
self.quiz_scoreboard()
|
|
|
|
elif cmd == 'halt' or cmd == 'quit':
|
|
if self.quiz:
|
|
self.quiz_scoreboard()
|
|
self.stop_quiz()
|
|
c.notice(self.channel, "Quiz halted by %s. Use ask to start a new one." % nick)
|
|
else:
|
|
c.notice(self.channel, "No quiz running.")
|
|
|
|
elif cmd == 'scores':
|
|
if self.quiz:
|
|
self.quiz_scoreboard()
|
|
|
|
elif cmd == 'tip' or cmd == 'hint':
|
|
if self.quiz:
|
|
self.quiz_tip()
|
|
|
|
elif cmd == 'ask':
|
|
if self.quiz:
|
|
c.notice(self.channel, "Quiz is running. Use halt or quit to stop.")
|
|
c.notice(self.channel, self.question.question)
|
|
elif isinstance(self.question, Question):
|
|
c.notice(self.channel, "There is an unanswered question.")
|
|
c.notice(self.channel, self.question.question)
|
|
else:
|
|
self.start_quiz(nick)
|
|
|
|
elif cmd == 'revolt':
|
|
if isinstance(self.question, Question):
|
|
c.notice(self.channel, "Fine, the answer is: %s" % self.question.answer)
|
|
self.quiz_get_next()
|
|
|
|
elif cmd.startswith('urls') or cmd.startswith('list'):
|
|
db = self.get_db()
|
|
try:
|
|
(listcmd, n) = cmd.split(" ", 1)
|
|
except ValueError:
|
|
n = '5'
|
|
|
|
n = n.strip()
|
|
if n == "all":
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())
|
|
elif n.find("-") > 0:
|
|
(x, y) = n.split("-", 1)
|
|
try:
|
|
x = abs(int(x))
|
|
y = abs(int(y))
|
|
if y < x:
|
|
x, y = y, x
|
|
except ValueError as ex:
|
|
c.notice(nick, "Give me a number or a range of numbers, e.g. list 5 or list 11-20")
|
|
raise ex
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())[x - 1: y]
|
|
else:
|
|
try:
|
|
n = abs(int(n))
|
|
except ValueError as ex:
|
|
c.notice(nick, "Give me a number or a range of numbers, e.g. list 5 or list 11-20")
|
|
raise ex
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())[:n]
|
|
|
|
for url in rows:
|
|
line = "%s %s" % (url.url, url.title)
|
|
c.notice(nick, line)
|
|
time.sleep(1)
|
|
|
|
else:
|
|
c.notice(nick, "Not understood: " + cmd)
|
|
|
|
def validate_config(self, config):
|
|
"""
|
|
Basic checks for configuration parameters. Returns a Boolean indicating
|
|
success or failure.
|
|
"""
|
|
|
|
# validate IRC host
|
|
if 'irc.server' not in config.keys():
|
|
print("Error: the IRC server was not specified. Use --help for more information.")
|
|
return False
|
|
|
|
# validate IRC port
|
|
if 'irc.port' not in config.keys():
|
|
config['irc.port'] = '6667'
|
|
try:
|
|
config['irc.port'] = int(config['irc.port'])
|
|
except ValueError:
|
|
print("Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information.")
|
|
return False
|
|
|
|
# validate IRC channel
|
|
if 'irc.channel' not in config.keys() or not config['irc.channel'].startswith('#'):
|
|
print("Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information.")
|
|
return False
|
|
|
|
# validate bot nickname
|
|
if 'irc.nickname' not in config.keys():
|
|
config['irc.nickname'] = 'lolbot'
|
|
|
|
# validate bot nickname
|
|
if 'db.file' not in config.keys():
|
|
config['db.file'] = 'lolbot.db'
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def get_options():
|
|
"""
|
|
Set up configuration from the script arguments.
|
|
"""
|
|
|
|
try:
|
|
(options, args) = getopt.getopt(sys.argv[1:], 'hc:s:p:j:n:d:', ['help', 'config=', 'server=', 'port=', 'join=', 'nick=', 'database=', ])
|
|
except getopt.GetoptError as err:
|
|
print(err)
|
|
LolBot.usage()
|
|
sys.exit(2)
|
|
|
|
config = {}
|
|
for option, value in options:
|
|
# Display help text.
|
|
if option in ('-h', '--help'):
|
|
LolBot.usage()
|
|
sys.exit(2)
|
|
|
|
# Get configuration from a file.
|
|
if option in ('-c', '--config'):
|
|
config = LolBot.load_config(value)
|
|
break
|
|
|
|
# Individually specified settings.
|
|
if option in ('-s', '--server'):
|
|
config['irc.server'] = value
|
|
if option in ('-p', '--port'):
|
|
config['irc.port'] = value
|
|
if option in ('-j', '--join', '--channel', '--join-channel'):
|
|
config['irc.channel'] = value
|
|
if option in ('-n', '--nickname'):
|
|
config['irc.nickname'] = value
|
|
if option in ('-d', '--database'):
|
|
config['db.file'] = value
|
|
|
|
return config
|
|
|
|
@staticmethod
|
|
def load_config(config_path=''):
|
|
"""
|
|
This method loads configuration options from a lolbot.conf file. The file
|
|
should look something like this::
|
|
|
|
irc.server = irc.yourdomain.com
|
|
irc.port = 6667
|
|
irc.channel = #lolbottest
|
|
irc.nickname = lolbot
|
|
db.file = lolbot.db
|
|
"""
|
|
|
|
if config_path == '':
|
|
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'lolbot.conf')
|
|
if not os.path.exists(config_path):
|
|
print("Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option.")
|
|
LolBot.usage()
|
|
sys.exit(1)
|
|
|
|
# open the configuration file and grab all param=value declarations.
|
|
config = {}
|
|
with open(config_path) as f:
|
|
for line in f:
|
|
# skip comments
|
|
if line.strip().startswith('#') or line.strip() == '':
|
|
continue
|
|
|
|
# collect up param = value
|
|
try:
|
|
(param, value) = line.strip().split('=', 1)
|
|
if param.strip() != '':
|
|
config[param.strip()] = value.strip()
|
|
except ValueError:
|
|
continue
|
|
|
|
return config
|
|
|
|
@staticmethod
|
|
def usage():
|
|
"""
|
|
Spits out CLI help.
|
|
"""
|
|
|
|
print("""Run a lolbot.
|
|
|
|
-h, --help
|
|
This message.
|
|
|
|
-s, --server=<hostname>
|
|
The IRC server, e.g. irc.freenode.net
|
|
|
|
-p, --port=<port>
|
|
The IRC server port. Default: 6667
|
|
|
|
-j, --join=<channel>
|
|
The chat channel to join, e.g. #trainspotting
|
|
|
|
-n, --nickname=<nickname>
|
|
The nickname for your lolbot. Default: "lolbot"
|
|
|
|
-d, --database=<path>
|
|
The path to a SQLite lolbot database. If not specified, lolbot will
|
|
attempt to use a SQLite database called lolbot.db in the same
|
|
directory as the script.
|
|
|
|
Configuration file:
|
|
|
|
-c, --config=<filename>
|
|
Specify a configuration file. Ignores any options specified from the
|
|
command-line. Defaults to lolbot.conf in the same directory as the
|
|
script. File layout:
|
|
|
|
irc.server = <host>
|
|
irc.port = <port>
|
|
irc.channel = <channel>
|
|
irc.nickname = <nickname>
|
|
db.file = <path>
|
|
""".strip())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
LolBot().start()
|
|
except KeyboardInterrupt:
|
|
pass
|