282 lines
9.2 KiB
Python
282 lines
9.2 KiB
Python
#! /usr/bin/env python
|
|
"""
|
|
LOLBOT 2
|
|
|
|
- die: Let the bot cease to exist.
|
|
- ask: Ask a MoxQuizz question.
|
|
- list: list some URLs
|
|
"""
|
|
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import sqlite3
|
|
import random
|
|
import time
|
|
import irc.strings
|
|
from irc.bot import SingleServerIRCBot
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from models import Log, Url, Model
|
|
from pymoxquizz import QuestionBank, Question
|
|
from os import listdir, path
|
|
|
|
|
|
DEBUG = True
|
|
|
|
|
|
def debug(msg):
|
|
if DEBUG:
|
|
print(msg)
|
|
|
|
|
|
class LolBot(SingleServerIRCBot):
|
|
"""
|
|
An IRC bot to entertain the troops with MoxQuizz questions, log URLs, and
|
|
other shenanigans.
|
|
"""
|
|
|
|
qb = list()
|
|
|
|
def __init__(self, channel, nickname, server, database, port=6667):
|
|
debug("Instantiating SingleServerIRCBot")
|
|
SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
|
|
self.channel = channel
|
|
|
|
# load some MoxQuizz questions
|
|
qfiles = [f for f in listdir('questions') if path.isfile(path.join('questions', f))]
|
|
debug("Loading MoxQuizz questions")
|
|
for f in qfiles:
|
|
qfile = path.abspath(path.join('questions', f))
|
|
debug(" - from MoxQuizz bank '%s'" % qfile)
|
|
self.qb += QuestionBank(qfile).questions
|
|
random.shuffle(self.qb)
|
|
self.quiz = 0
|
|
self.question = None
|
|
|
|
# connect to the database
|
|
debug("Connecting to SQLite database '%s'" % database)
|
|
self.dbfile = database
|
|
self.dbengine = create_engine('sqlite+pysqlite://', creator=self._get_connection)
|
|
Model.metadata.bind = self.dbengine
|
|
Model.metadata.create_all()
|
|
self.db_session = sessionmaker(bind=self.dbengine)
|
|
|
|
self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); <url> - adds the URL to the list; help - this message."
|
|
debug("Exiting lolbot constructor")
|
|
|
|
def _get_connection(self):
|
|
"""Creator function for SQLAlchemy."""
|
|
connection = sqlite3.Connection(self.dbfile)
|
|
connection.text_factory = str
|
|
debug("Creating SQLAlchemy connection")
|
|
return connection
|
|
|
|
def on_nicknameinuse(self, connection, event):
|
|
nick = connection.get_nickname()
|
|
debug("Nick '%s' in use, trying '%s_'" % nick)
|
|
connection.nick(nick + "_")
|
|
|
|
def on_welcome(self, connection, event):
|
|
debug("Joining channel '%s'" % self.channel)
|
|
connection.join(self.channel)
|
|
|
|
def on_privmsg(self, connection, event):
|
|
self.do_command(event, event.arguments[0])
|
|
|
|
def on_pubmsg(self, connection, event):
|
|
"""
|
|
Handle an event on the channel.
|
|
Handle commands addressed to the bot.
|
|
If there's a question, see if it's been answered.
|
|
"""
|
|
try:
|
|
(nick, message) = event.arguments[0].split(":", 1)
|
|
# handle command, if addressed
|
|
if irc.strings.lower(nick) == irc.strings.lower(self.connection.get_nickname()):
|
|
self.do_command(event, message.strip())
|
|
except ValueError:
|
|
message = event.arguments[0]
|
|
nick = event.source.nick
|
|
|
|
# deal with MoxQuizz question
|
|
if self.quiz:
|
|
self.handle_quiz(nick, message)
|
|
|
|
def start_quiz(self, nick):
|
|
self.quiz = 0
|
|
self.quiz_scores = dict()
|
|
self.connection.notice(self.channel, 'Quiz begun by %s.' % nick)
|
|
self.quiz_get_next()
|
|
|
|
def stop_quiz(self):
|
|
self.quiz = 0
|
|
self.quiz_scores = None
|
|
self.question = None
|
|
|
|
def quiz_get_next(self):
|
|
self.quiz += 1
|
|
self.question = random.choice(self.qb)
|
|
print(str(self.question.question))
|
|
self.connection.notice(self.channel, str(self.question.question))
|
|
|
|
def quiz_award_points(self, nick):
|
|
score = "%s point" % self.question.score
|
|
if self.question.score != 1:
|
|
score += "s"
|
|
|
|
self.connection.notice(self.channel, 'Correct! The answer was %s. %s scores %s.' % (self.question.answer, nick, score))
|
|
if nick not in self.quiz_scores.keys():
|
|
self.quiz_scores[nick] = 0
|
|
self.quiz_scores[nick] += self.question.score
|
|
|
|
def quiz_check_win(self, nick):
|
|
if self.quiz_scores[nick] == 10:
|
|
self.connection.notice(self.channel, '%s wins with 10 points!' % nick)
|
|
self.quiz_scoreboard()
|
|
self.stop_quiz()
|
|
|
|
def quiz_scoreboard(self):
|
|
self.connection.notice(self.channel, 'Scoreboard:')
|
|
for nick in self.quiz_scores.keys():
|
|
score = "%s point" % self.quiz_scores[nick]
|
|
if self.quiz_scores[nick] != 1:
|
|
score += "s"
|
|
self.connection.notice(self.channel, '%s has %s.' % (nick, score))
|
|
if not len(self.quiz_scores):
|
|
self.connection.notice(self.channel, 'So far, nobody has got anything right.')
|
|
|
|
def handle_quiz(self, nick, message):
|
|
# bail if there's no quiz or unanswered question.
|
|
if not self.quiz or not isinstance(self.question, Question):
|
|
return
|
|
|
|
# see if anyone answered correctly.
|
|
if self.question.attempt(message):
|
|
self.quiz_award_points(nick)
|
|
self.quiz_check_win(nick)
|
|
|
|
# if nobody has won, carry on
|
|
if self.quiz:
|
|
|
|
# scores every 10 questions.
|
|
if self.quiz % 10 == 0:
|
|
self.quiz_scoreboard()
|
|
|
|
self.quiz_get_next()
|
|
|
|
def do_command(self, e, cmd):
|
|
"""
|
|
Handle bot commands.
|
|
"""
|
|
nick = e.source.nick
|
|
c = self.connection
|
|
|
|
if cmd == "die":
|
|
self.die()
|
|
|
|
elif cmd == 'help':
|
|
c.notice(nick, self.helptext)
|
|
|
|
elif cmd == 'status':
|
|
c.notice(self.channel, "I know %s questions." % len(self.qb))
|
|
if self.quiz:
|
|
c.notice(self.channel, "I am currently running a quiz.")
|
|
self.quiz_scoreboard()
|
|
|
|
elif cmd == 'halt' or cmd == 'quit':
|
|
if self.quiz:
|
|
self.quiz_scoreboard()
|
|
self.stop_quiz()
|
|
c.notice(self.channel, "Quiz halted by %s. Use ask to start a new one." % nick)
|
|
else:
|
|
c.notice(self.channel, "No quiz running.")
|
|
|
|
elif cmd == 'scores':
|
|
if self.quiz:
|
|
self.quiz_scoreboard()
|
|
|
|
elif cmd == 'ask':
|
|
if self.quiz:
|
|
c.notice(self.channel, "Quiz is running. Use halt or quit to stop.")
|
|
c.notice(self.channel, str(self.question.question))
|
|
elif isinstance(self.question, Question):
|
|
c.notice(self.channel, "There is an unanswered question.")
|
|
c.notice(self.channel, str(self.question.question))
|
|
else:
|
|
self.start_quiz(nick)
|
|
|
|
elif cmd == 'revolt':
|
|
if isinstance(self.question, Question):
|
|
c.notice(self.channel, "Fine, the answer is: %s" % self.question.answer)
|
|
self.quiz_get_next()
|
|
|
|
elif cmd.startswith('urls') or cmd.startswith('list'):
|
|
db = self.db_session()
|
|
try:
|
|
(listcmd, n) = cmd.split(" ", 1)
|
|
except ValueError:
|
|
n = '5'
|
|
|
|
n = n.strip()
|
|
if n == "all":
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())
|
|
elif n.find("-") > 0:
|
|
(x, y) = n.split("-", 1)
|
|
try:
|
|
x = abs(int(x))
|
|
y = abs(int(y))
|
|
if y < x:
|
|
x, y = y, x
|
|
except ValueError as ex:
|
|
c.notice(nick, "Give me a number or a range of numbers, e.g. list 5 or list 11-20")
|
|
raise ex
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())[x - 1: y]
|
|
else:
|
|
try:
|
|
n = abs(int(n))
|
|
except ValueError as ex:
|
|
c.notice(nick, "Give me a number or a range of numbers, e.g. list 5 or list 11-20")
|
|
raise ex
|
|
rows = db.query(Url).order_by(Url.timestamp.desc())[:n]
|
|
|
|
for url in rows:
|
|
line = "%s %s" % (url.url, url.title)
|
|
c.notice(nick, line)
|
|
time.sleep(1)
|
|
|
|
else:
|
|
c.notice(nick, "Not understood: " + cmd)
|
|
|
|
|
|
def main():
|
|
import sys
|
|
if len(sys.argv) != 5:
|
|
print("Usage: lolbot2.py <server[:port]> <channel> <nickname> <db>")
|
|
sys.exit(1)
|
|
|
|
s = sys.argv[1].split(":", 1)
|
|
server = s[0]
|
|
if len(s) == 2:
|
|
try:
|
|
port = int(s[1])
|
|
except ValueError:
|
|
print("Error: Erroneous port.")
|
|
sys.exit(1)
|
|
else:
|
|
port = 6667
|
|
channel = sys.argv[2]
|
|
nickname = sys.argv[3]
|
|
database = sys.argv[4]
|
|
|
|
debug("Parameters: server=%s port=%s nickname=%s channel=%s database=%s" % (server, port, nickname, channel, database))
|
|
|
|
irc.client.ServerConnection.buffer_class = irc.buffer.LenientDecodingLineBuffer
|
|
bot = LolBot(channel, nickname, server, database, port)
|
|
bot.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
pass
|