From 52fd6a5b37fb37e89ecd486cdc0c7c4feddc6df9 Mon Sep 17 00:00:00 2001 From: Jonathan Harker Date: Thu, 19 Nov 2015 23:38:54 +1300 Subject: [PATCH] Rewrite to support basic quiz gameplay. --- lolbot.py | 558 +++++++++++++++++++------------------------------- pymoxquizz.py | 139 ++++++++++++- 2 files changed, 345 insertions(+), 352 deletions(-) mode change 100755 => 100644 lolbot.py diff --git a/lolbot.py b/lolbot.py old mode 100755 new mode 100644 index 8316fd2..26569cf --- a/lolbot.py +++ b/lolbot.py @@ -1,390 +1,264 @@ -#!/usr/bin/env python -# -# LolBot -# -# New version based on Twisted IRC. - +#! /usr/bin/env python """ -Useful bot for folks stuck behind censor walls at work -Logs a channel and collects URLs for later. +LOLBOT 2 + + - die: Let the bot cease to exist. + - ask: Ask a MoxQuizz question. + - list: list some URLs """ -from __future__ import print_function # unicode_literals +from __future__ import print_function, unicode_literals -try: - import sys - import os - import string - import random - import time - import getopt - import sqlite3 - from twisted.words.protocols import irc - from twisted.internet import protocol - from twisted.internet import reactor - from datetime import datetime - from sqlalchemy import create_engine - from sqlalchemy.orm import sessionmaker - from pymoxquizz import QuestionBank, Question - from models import Url, Log, Model -except ImportError: - print("Some modules missing: Lolbot relies on Twisted IRC, Mechanize and SQLAlchemy.\n") - sys.exit - -# Exclamations - wrong input -exclamations = [ - "Zing!", - "Burns!", - "Tard!", - "Lol.", - "Crazy!", - "WTF?", -] - -# Ponderings -ponderings = [ - "Hi, can I have a medium lamb roast, with just potatoes.", - "Can I slurp on your Big Cock?", - "Quentin Tarantino is so awesome I want to have his babies.", - "No it's a week night 8pm is past my bedtime.", -] +import sqlite3 +import random +import time +import irc.strings +from irc.bot import SingleServerIRCBot +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from models import Log, Url, Model +from pymoxquizz import QuestionBank, Question +from os import listdir, path -class LolBot(irc.IRCClient): +DEBUG = True + + +def debug(msg): + if DEBUG: + print(msg) + + +class LolBot(SingleServerIRCBot): """ - The Lolbot itself. + An IRC bot to entertain the troops with MoxQuizz questions, log URLs, and + other shenanigans. """ - def _get_connection(self): - connection = sqlite3.Connection(self.config['db.file']) - connection.text_factory = str - return connection + qb = list() + + def __init__(self, channel, nickname, server, database, port=6667): + debug("Instantiating SingleServerIRCBot") + SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname) + self.channel = channel - def created(self, when): # load some MoxQuizz questions - self.qb = QuestionBank('/home/johnno/questions.doctorlard.en').questions + qfiles = [f for f in listdir('questions') if path.isfile(path.join('questions', f))] + debug("Loading MoxQuizz questions") + for f in qfiles: + qfile = path.abspath(path.join('questions', f)) + debug(" - from MoxQuizz bank '%s'" % qfile) + self.qb += QuestionBank(qfile).questions random.shuffle(self.qb) + self.quiz = 0 self.question = None # connect to the database + debug("Connecting to SQLite database '%s'" % database) + self.dbfile = database self.dbengine = create_engine('sqlite+pysqlite://', creator=self._get_connection) Model.metadata.bind = self.dbengine Model.metadata.create_all() - self.get_session = sessionmaker(bind=self.dbengine) + self.db_session = sessionmaker(bind=self.dbengine) - self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); clear - clears the list; lol - say something funny; - adds the URL to the list; help - this message." + self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); - adds the URL to the list; help - this message." + debug("Exiting lolbot constructor") - def now(self): - return datetime.today().strftime("%Y-%m-%d %H:%M:%S") + def _get_connection(self): + """Creator function for SQLAlchemy.""" + connection = sqlite3.Connection(self.dbfile) + connection.text_factory = str + debug("Creating SQLAlchemy connection") + return connection - def save_url(self, nickname, url): - title = False - try: - db = self.get_session() - if not db.query(Url).filter(Url.url == url).count(): - theurl = Url(nickname, url) - db.add(theurl) - db.commit() - else: - theurl = db.query(Url).filter(Url.url == url).one() - print(theurl) - title = theurl.title - except Exception as ex: - print("Exception caught saving URL: %s" % ex) - return title + def on_nicknameinuse(self, connection, event): + nick = connection.get_nickname() + debug("Nick '%s' in use, trying '%s_'" % nick) + connection.nick(nick + "_") - def log_event(self, nick, text): - try: - entry = Log(nick, text) - db = self.get_session() - db.add(entry) - db.commit() - print(entry) - except Exception as ex: - print("Exception caught logging event: %s" % ex) + def on_welcome(self, connection, event): + debug("Joining channel '%s'" % self.channel) + connection.join(self.channel) - def _get_nickname(self): - return self.factory.nickname - nickname = property(_get_nickname) + def on_privmsg(self, connection, event): + self.do_command(event, event.arguments[0]) - def _get_channel(self): - return self.factory.channel - channel = property(_get_channel) - - def _get_config(self): - return self.factory.config - config = property(_get_config) - - def signedOn(self): - self.join(self.channel) - print("Signed on as %s." % (self.nickname,)) - - def joined(self, channel): - print("Joined %s." % (channel,)) - - def privmsg(self, user, channel, msg): - user = user.split('!')[0] - - if channel == self.nickname: - # Private /msg from a user - self.do_command(msg, user) - else: - # log it - self.log_event(user, msg) - - # deal with MoxQuizz question - answered = False - if isinstance(self.question, Question) and self.question.answer.lower() in msg.lower(): - answered = True - print("%s +1" % user) - self.reply('Correct! %s scores one point.' % user) - if answered: - self.question = None - - args = string.split(msg, ":", 1) - if len(args) > 1 and args[0] == self.nickname: - self.do_command(string.strip(args[1])) - else: - # parse it for links, add URLs to the list - words = msg.split(" ") - for w in words: - if w.startswith('http://') or w.startswith('https://'): - title = self.save_url(user, w) - if title is False: - self.say_public("Sorry, I'm useless at UTF-8.") - else: - self.say_public("URL added. %s" % title) - - def say_public(self, text): - "Print TEXT into public channel, for all to see." - self.notice(self.channel, text) - self.log_event(self.nickname, text) - - def say_private(self, nick, text): - "Send private message of TEXT to NICK." - self.notice(nick, text) - - def reply(self, text, to_private=None): - "Send TEXT to either public channel or TO_PRIVATE nick (if defined)." - if to_private is not None: - self.say_private(to_private, text) - else: - self.say_public(text) - - def ponder(self): - "Return a random pondering." - return random.choice(ponderings) - - def exclaim(self): - "Return a random exclamation string." - return random.choice(exclamations) - - def do_command(self, cmd, target=None): + def on_pubmsg(self, connection, event): """ - This is the function called whenever someone sends a public or - private message addressed to the bot. (e.g. "bot: blah"). + Handle an event on the channel. + Handle commands addressed to the bot. + If there's a question, see if it's been answered. """ - try: - if cmd == 'help': - self.reply(self.helptext, target) + (nick, message) = event.arguments[0].split(":", 1) + # handle command, if addressed + if irc.strings.lower(nick) == irc.strings.lower(self.connection.get_nickname()): + self.do_command(event, message.strip()) + except ValueError: + message = event.arguments[0] + nick = event.source.nick - elif cmd == 'ask': - self.question = random.choice(self.qb) - self.reply(str(self.question.question)) + # deal with MoxQuizz question + if self.quiz: + self.handle_quiz(nick, message) - elif cmd == 'lol': - self.reply(self.ponder(), target) + def start_quiz(self, nick): + self.quiz = 0 + self.quiz_scores = dict() + self.connection.notice(self.channel, 'Quiz begun by %s.' % nick) + self.quiz_get_next() - elif cmd == 'urls' or cmd == 'list': - db = self.get_session() - for url in db.query(Url).order_by(Url.timestamp.desc())[:10]: - line = "%s %s" % (url.url, url.title) - self.reply(line, target) - time.sleep(1) + def stop_quiz(self): + self.quiz = 0 + self.quiz_scores = None + self.question = None - elif cmd.startswith('urls ') or cmd.startswith('list '): - db = self.get_session() - (listcmd, n) = cmd.split(" ", 1) - n = n.strip() - if n == "all": - rows = db.query(Url).order_by(Url.timestamp.desc()) - elif n.find("-") > 0: - (x, y) = n.split("-", 1) - try: - x = abs(int(x)) - y = abs(int(y)) - if y < x: - x, y = y, x - except ValueError as ex: - self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target) - raise ex - rows = db.query(Url).order_by(Url.timestamp.desc())[x-1:y] - else: - try: - n = abs(int(n)) - except ValueError as ex: - self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target) - raise ex - rows = db.query(Url).order_by(Url.timestamp.desc())[:n] + def quiz_get_next(self): + self.quiz += 1 + self.question = random.choice(self.qb) + print(str(self.question.question)) + self.connection.notice(self.channel, str(self.question.question)) - for url in rows: - line = "%s %s" % (url.url, url.title) - self.reply(line, target) - time.sleep(1) + def quiz_award_points(self, nick): + score = "%s point" % self.question.score + if self.question.score != 1: + score += "s" - elif cmd.startswith('http:') or cmd.startswith('https:'): - title = self.save_url(target, cmd) - if title is False: - self.say_public("Sorry, I'm useless at UTF-8.") - else: - self.reply('URL added. %s' % title, target) + self.connection.notice(self.channel, 'Correct! The answer was %s. %s scores %s.' % (self.question.answer, nick, score)) + if nick not in self.quiz_scores.keys(): + self.quiz_scores[nick] = 0 + self.quiz_scores[nick] += self.question.score + def quiz_check_win(self, nick): + if self.quiz_scores[nick] == 10: + self.connection.notice(self.channel, '%s wins with 10 points!' % nick) + self.quiz_scoreboard() + self.stop_quiz() + + def quiz_scoreboard(self): + self.connection.notice(self.channel, 'Scoreboard:') + for nick in self.quiz_scores.keys(): + score = "%s point" % self.quiz_scores[nick] + if self.quiz_scores[nick] != 1: + score += "s" + self.connection.notice(self.channel, '%s has %s.' % (nick, score)) + + def handle_quiz(self, nick, message): + # bail if there's no quiz or unanswered question. + if not self.quiz or not isinstance(self.question, Question): + return + + # see if anyone answered correctly. + if self.question.attempt(message): + self.quiz_award_points(nick) + self.quiz_check_win(nick) + # if nobody has won, carry on + if self.quiz: + self.quiz_get_next() + + def do_command(self, e, cmd): + """ + Handle bot commands. + """ + nick = e.source.nick + c = self.connection + + if cmd == "die": + self.die() + + elif cmd == 'help': + c.notice(nick, self.helptext) + + elif cmd == 'status': + c.notice(nick, "I know %s questions." % len(self.qb)) + + elif cmd == 'halt' or cmd == 'quit': + if self.quiz: + self.quiz_scoreboard() + self.stop_quiz() + c.notice(self.channel, "Quiz halted by %s. Use ask to start a new one." % nick) else: - self.reply(self.exclaim(), target) + c.notice(self.channel, "No quiz running.") - except Exception as ex: - print("Exception caught processing command: %s" % ex) - print(" command was '%s' from %s" % (cmd, target)) - self.reply("Sorry, I didn't understand: %s" % cmd, target) - self.reply(self.helptext, target) + elif cmd == 'ask': + if self.quiz: + c.notice(self.channel, "Quiz is running. Use halt or quit to stop.") + c.notice(self.channel, str(self.question.question)) + elif isinstance(self.question, Question): + c.notice(self.channel, "There is an unanswered question.") + c.notice(self.channel, str(self.question.question)) + else: + self.start_quiz(nick) + elif cmd == 'revolt': + if isinstance(self.question, Question): + c.notice(self.channel, "Fine, the answer is: %s" % self.question.answer) + self.quiz_get_next() -class LolBotFactory(protocol.ClientFactory): - protocol = LolBot - - def __init__(self, config_path): - self.config = get_config(config_path) - self.server = self.config['irc.server'] - self.port = self.config['irc.port'] - self.nickname = self.config['irc.nickname'] - self.channel = self.config['irc.channel'] - - def clientConnectionLost(self, connector, reason): - print("Lost connection (%s), reconnecting." % (reason,)) - connector.connect() - - def clientConnectionFailed(self, connector, reason): - print("Could not connect: %s" % (reason,)) - - -def get_options(): - try: - (options, args) = getopt.getopt(sys.argv[1:], "hc:", ["help", "config=", ]) - except getopt.GetoptError as err: - print(str(err)) - usage() - sys.exit(2) - - config_path = "" - for option, value in options: - if option in ("-h", "--help"): - usage() - sys.exit(2) - if option in ("-c", "--config"): - config_path = value - return {'config_path': config_path} - - -def get_config(config_path): - """ - This method loads configuration options from a lolbot.conf file. The file - should look like this: - - irc.server = irc.freenode.net - irc.port = 6667 - irc.channel = #lolbottest - irc.nickname = lolbot - db.url = sqlite:///lolbot.db - - """ - - if not config_path: - config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lolbot.conf") - if not os.path.exists(config_path): - print("Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option.") - usage() - sys.exit(1) - - # open the configuration file and grab all param=value declarations. - config = {} - with open(config_path) as f: - for line in f: - # skip comments - if line.strip().startswith("#") or line.strip() == "": - continue - - # collect up param = value + elif cmd.startswith('urls') or cmd.startswith('list'): + db = self.db_session() try: - (param, value) = line.strip().split("=", 1) - if param.strip() != "": - config[param.strip()] = value.strip() + (listcmd, n) = cmd.split(" ", 1) except ValueError: - continue + n = '5' - # validate IRC host - if "irc.server" not in config.keys(): - print("Error: the IRC server was not specified. Use --help for more information.") + n = n.strip() + if n == "all": + rows = db.query(Url).order_by(Url.timestamp.desc()) + elif n.find("-") > 0: + (x, y) = n.split("-", 1) + try: + x = abs(int(x)) + y = abs(int(y)) + if y < x: + x, y = y, x + except ValueError as ex: + c.notice(nick, "Give me a number or a range of numbers, e.g. list 5 or list 11-20") + raise ex + rows = db.query(Url).order_by(Url.timestamp.desc())[x - 1: y] + else: + try: + n = abs(int(n)) + except ValueError as ex: + c.notice(nick, "Give me a number or a range of numbers, e.g. list 5 or list 11-20") + raise ex + rows = db.query(Url).order_by(Url.timestamp.desc())[:n] + + for url in rows: + line = "%s %s" % (url.url, url.title) + c.notice(nick, line) + time.sleep(1) + + else: + c.notice(nick, "Not understood: " + cmd) + + +def main(): + import sys + if len(sys.argv) != 5: + print("Usage: lolbot2.py ") sys.exit(1) - # validate IRC port - if "irc.port" not in config.keys(): - config["irc.port"] = "6667" - try: - config["irc.port"] = int(config["irc.port"]) - except ValueError: - print("Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information.") - sys.exit(1) + s = sys.argv[1].split(":", 1) + server = s[0] + if len(s) == 2: + try: + port = int(s[1]) + except ValueError: + print("Error: Erroneous port.") + sys.exit(1) + else: + port = 6667 + channel = sys.argv[2] + nickname = sys.argv[3] + database = sys.argv[4] - # validate IRC channel - if "irc.channel" not in config.keys() or not config["irc.channel"].startswith("#"): - print("Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information.") - sys.exit(1) + debug("Parameters: server=%s port=%s nickname=%s channel=%s database=%s" % (server, port, nickname, channel, database)) - # validate bot nickname - if "irc.nickname" not in config.keys(): - config["irc.nickname"] = "lolbot" - - return config - - -def usage(): - print("""Run a lolbot. - - -h, --help - This message. - - -c, --config= - Specify a configuration file. Defaults to lolbot.conf in the same - directory as the script. - -Configuration: - - irc.server = - The IRC server, e.g. irc.freenode.net - - irc.port = - The IRC server port. Default: 6667 - - irc.channel = - The chat channel to join, e.g. #trainspotting - - irc.nickname = - The nickname for your lolbot. Default: "lolbot" - - db.url = - The URL to your lolbot database. This can be any valid URL accepted - by SQL Alchemy. If not specified, lolbot will attempt to use a - SQLite database called lolbot.db in the same directory as the - script. -""") + irc.client.ServerConnection.buffer_class = irc.buffer.LenientDecodingLineBuffer + bot = LolBot(channel, nickname, server, database, port) + bot.start() if __name__ == "__main__": - args = get_options() - config = get_config(args['config_path']) - try: - reactor.connectTCP(config['irc.server'], config['irc.port'], LolBotFactory(args['config_path'])) - reactor.run() - except KeyboardInterrupt: - print("Shutting down.") + main() diff --git a/pymoxquizz.py b/pymoxquizz.py index 81c34fe..300d14c 100644 --- a/pymoxquizz.py +++ b/pymoxquizz.py @@ -1,7 +1,14 @@ #!/usr/bin/env python ## -*- coding: utf-8 -*- +""" +A MozQuizz question library for Python. +See http://moxquizz.de/ for the original implementation in TCL. +""" + from __future__ import unicode_literals, print_function from io import open +import re +import sys class Question: @@ -10,30 +17,103 @@ class Question: """ category = None + """ + The question category. Arbitrary text; optional. + """ + question = None + """ + The question. Arbitrary text; required. + """ + answer = None + """ + The answer. Arbitrary text; required. Correct answers can also be covered + by the :attr:`regexp` property. + """ + regexp = None + """ + A regular expression that will generate correct answers. Optional. See + also the :attr:`answer` property. + """ + author = None - level = None + """ + The question author. Arbitrary text; optional. + """ + + level = None # Default: NORMAL (constructor) + """ + The difficulty level. Value must be from the :attr:`LEVELS` tuple. + The default value is :attr:`NORMAL`. + """ + comment = None - score = 0 + """ + A comment. Arbitrary text; optional. + """ + + score = 1 + """ + The points scored for the correct answer. Integer value; default is 1. + """ tip = list() + """ + An ordered list of tips (hints) to display to users. Optional. + """ + tipcycle = 0 + """ + Indicates which tip is to be displayed next, if any. + """ TRIVIAL = 1 + """ + A value for :attr:`level` that indicates a question of trivial difficulty. + """ + EASY = 2 + """ + A value for :attr:`level` that indicates a question of easy difficulty. + """ + NORMAL = 3 + """ + A value for :attr:`level` that indicates a question of average or normal + difficulty. + """ + HARD = 4 + """ + A value for :attr:`level` that indicates a question of hard difficulty. + """ + EXTREME = 5 + """ + A value for :attr:`level` that indicates a question of extreme difficulty + or obscurity. + """ LEVELS = (TRIVIAL, EASY, NORMAL, HARD, EXTREME) + """ + The available :attr:`level` difficulty values, :attr:`TRIVIAL`, :attr:`EASY`, + :attr:`NORMAL`, :attr:`HARD` and :attr:`EXTREME`. + """ def __init__(self, attributes_dict): + """ + Constructor that takes a dictionary of MoxQuizz key-value pairs. Usually + called from a :class:`QuestionBank`. + """ + # Set defaults first. + self.level = self.NORMAL self.parse(attributes_dict) def parse(self, attributes_dict): """ - Populate fields from a dictionary of attributes (from a question bank). + Populate fields from a dictionary of attributes, usually provided by a + :class:`QuestionBank` :attr:`~QuestionBank.parse` call. """ ## Valid keys: @@ -69,7 +149,9 @@ class Question: self.category = attributes_dict['Author'] if 'Level' in attributes_dict.keys() and attributes_dict['Level'] in self.LEVELS: - self.level = attributes_dict['level'] + self.level = attributes_dict['Level'] + elif 'Level' in attributes_dict.keys() and attributes_dict['Level'] in QuestionBank.LEVEL_VALUES.keys(): + self.level = QuestionBank.LEVEL_VALUES[attributes_dict['Level']] if 'Comment' in attributes_dict.keys(): self.comment = attributes_dict['Comment'] @@ -83,6 +165,9 @@ class Question: if 'Tipcycle' in attributes_dict.keys(): self.tipcycle = attributes_dict['Tipcycle'] + def attempt(self, answer): + return (self.answer is not None and self.answer.lower() == answer.lower()) or ( + self.regexp is not None and re.search(self.regexp, answer, re.IGNORECASE) is not None) class QuestionBank: """ @@ -90,7 +175,15 @@ class QuestionBank: """ filename = '' + """ + The path or filename of the question bank file. + """ + questions = list() + """ + A list of :class:`Question` objects, constituting the questions in the + question bank. + """ # Case sensitive, to remain backwards-compatible with MoxQuizz. KEYS = ('Answer', @@ -104,17 +197,34 @@ class QuestionBank: 'Tip', 'Tipcycle', ) + """ + The valid attributes available in a MoxQuizz question bank file. + """ + + LEVEL_VALUES = { + 'trivial': Question.TRIVIAL, + 'baby': Question.TRIVIAL, + 'easy': Question.EASY, + 'normal': Question.NORMAL, + 'hard': Question.HARD, + 'difficult': Question.HARD, + 'extreme': Question.EXTREME + } + """ + Text labels for the :attr:`Question.level` difficulty values. + """ def __init__(self, filename): """ - Construct a question bank from a file. + Constructor, takes a MozQuizz-formatted question bank filename. """ self.filename = filename self.questions = self.parse(filename) def parse(self, filename): """ - Read a Moxquizz question bank file into a list. + Read a MoxQuizz-formatted question bank file. Returns a ``list`` of + :class:`Question` objects found in the file. """ questions = list() @@ -149,6 +259,8 @@ class QuestionBank: # Fetch the next parameter. try: (key, value) = line.split(':', 1) + key = key.strip() + value = value.strip() except ValueError: print("Unexpected weirdness in MoxQuizz questionbank '%s', line %s." % (self.filename, i)) continue @@ -162,6 +274,11 @@ class QuestionBank: # Enumerate the Tips. if key == 'Tip': q['Tip'].append(value.strip()) + elif key == 'Level': + if value not in self.LEVEL_VALUES: + print("Unexpected Level value '%s' in MoxQuizz questionbank '%s', line '%s'." % (value, self.filename, i)) + else: + q['Level'] = self.LEVEL_VALUES[value] else: q[key] = value.strip() @@ -170,12 +287,14 @@ class QuestionBank: # A crappy test. if __name__ == '__main__': - qb = QuestionBank('questions.doctorlard.en') + qb = QuestionBank('questions/questions.doctorlard.en') for q in qb.questions: print(q.question) - a = unicode(raw_input('A: '), 'utf8') - #a = input('A: ') # Python 3 - if a.lower() == q.answer.lower(): + if sys.version.startswith('2'): + a = unicode(raw_input('A: '), 'utf8') + else: + a = input('A: ') + if q.attempt(a): print("Correct!") else: print("Incorrect - the answer is '%s'" % q.answer)