Initial commit.
This commit is contained in:
commit
679f6b9bcb
3 changed files with 442 additions and 0 deletions
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
*.pyc
|
||||
*.log
|
||||
/*.db
|
||||
/*.conf
|
||||
10
lolbot.conf.sample
Normal file
10
lolbot.conf.sample
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
irc.server = irc.wgtn.cat-it.co.nz
|
||||
irc.channel = #lolbottest
|
||||
|
||||
# defaults:
|
||||
# irc.port = 6667
|
||||
# irc.nickname = lolbot
|
||||
|
||||
# sqlite database filename
|
||||
db.file = lolbot.db
|
||||
|
||||
428
lolbot.py
Executable file
428
lolbot.py
Executable file
|
|
@ -0,0 +1,428 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# LolBot
|
||||
#
|
||||
# New version based on Twisted IRC.
|
||||
|
||||
"""
|
||||
Useful bot for folks stuck behind censor walls at work
|
||||
Logs a channel and collects URLs for later.
|
||||
"""
|
||||
|
||||
try:
|
||||
import sys
|
||||
import os
|
||||
import string
|
||||
import re
|
||||
import random
|
||||
import time
|
||||
import getopt
|
||||
import sqlite3
|
||||
from twisted.words.protocols import irc
|
||||
from twisted.internet import protocol
|
||||
from twisted.internet import reactor
|
||||
from datetime import datetime
|
||||
from mechanize import Browser
|
||||
from sqlalchemy import MetaData, Table, Column, String, Text, Integer, DateTime, create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
except ImportError:
|
||||
print "Some modules missing: Lolbot relies on Twisted IRC, Mechanize and SQLAlchemy.\n"
|
||||
sys.exit
|
||||
|
||||
SqlBase = declarative_base()
|
||||
|
||||
class Log(SqlBase):
|
||||
"""
|
||||
This class represents an event in the log table and inherits from a SQLAlchemy
|
||||
convenience ORM class.
|
||||
"""
|
||||
|
||||
__tablename__ = "log"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
timestamp = Column(DateTime)
|
||||
nickname = Column(String(20))
|
||||
text = Column(Text)
|
||||
|
||||
def __init__(self, nickname, text, timestamp=None):
|
||||
"""
|
||||
Creates an event log for the IRC logger.
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.now()
|
||||
self.timestamp = timestamp
|
||||
self.nickname = nickname
|
||||
self.text = text
|
||||
|
||||
def __repr__(self):
|
||||
return "(%s) %s: %s" % (self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), self.nickname, self.text)
|
||||
|
||||
class Url(SqlBase):
|
||||
"""
|
||||
This class represents a saved URL and inherits from a SQLAlchemy convenience
|
||||
ORM class.
|
||||
"""
|
||||
|
||||
__tablename__ = "url"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
timestamp = Column(DateTime)
|
||||
nickname = Column(String(20))
|
||||
url = Column(String(200), unique=True)
|
||||
title = Column(Text)
|
||||
|
||||
def __init__(self, nickname, url, title=None, timestamp=None):
|
||||
if timestamp is None:
|
||||
timestamp = datetime.now()
|
||||
self.timestamp = timestamp
|
||||
self.nickname = nickname
|
||||
self.url = url
|
||||
self.title = title
|
||||
|
||||
# populate the title from the URL if not given.
|
||||
if title is None:
|
||||
try:
|
||||
br = Browser()
|
||||
br.open(self.url)
|
||||
self.title = br.title()
|
||||
except Exception as ex:
|
||||
self.title = ''
|
||||
|
||||
def __repr__(self):
|
||||
if not self.title:
|
||||
return "%s: %s" % (self.nickname, self.url)
|
||||
else:
|
||||
return "%s: %s - %s" % (self.nickname, self.url, self.title)
|
||||
|
||||
class LolBot(irc.IRCClient):
|
||||
"""
|
||||
The Lolbot itself.
|
||||
"""
|
||||
|
||||
def _get_connection(self):
|
||||
connection = sqlite3.Connection(self.config['db.file'])
|
||||
connection.text_factory = str
|
||||
return connection
|
||||
|
||||
def created(self, when):
|
||||
# connect to the database
|
||||
self.dbengine = create_engine('sqlite+pysqlite://', creator=self._get_connection)
|
||||
SqlBase.metadata.bind = self.dbengine
|
||||
SqlBase.metadata.create_all()
|
||||
self.get_session = sessionmaker(bind=self.dbengine)
|
||||
|
||||
self.helptext = "Keeps a list of URLs. Commands: list [n|x-y] - prints the last 10 URLs (or n URLs, or x through y); clear - clears the list; lol - say something funny; <url> - adds the URL to the list; help - this message."
|
||||
|
||||
def now(self):
|
||||
return datetime.today().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
def save_url(self, nickname, url):
|
||||
title = False
|
||||
try:
|
||||
db = self.get_session()
|
||||
if not db.query(Url).filter(Url.url == url).count():
|
||||
theurl = Url(nickname, url)
|
||||
db.add(theurl)
|
||||
db.commit()
|
||||
else:
|
||||
theurl = db.query(Url).filter(Url.url == url).one()
|
||||
print theurl
|
||||
title = theurl.title
|
||||
except Exception, ex:
|
||||
print "Exception caught saving URL: %s" % ex
|
||||
return title
|
||||
|
||||
def log_event(self, nick, text):
|
||||
try:
|
||||
entry = Log(nick, text)
|
||||
db = self.get_session()
|
||||
db.add(entry)
|
||||
db.commit()
|
||||
print entry
|
||||
except Exception, ex:
|
||||
print "Exception caught logging event: %s" % ex
|
||||
|
||||
def _get_nickname(self):
|
||||
return self.factory.nickname
|
||||
nickname = property(_get_nickname)
|
||||
|
||||
def _get_channel(self):
|
||||
return self.factory.channel
|
||||
channel = property(_get_channel)
|
||||
|
||||
def _get_config(self):
|
||||
return self.factory.config
|
||||
config = property(_get_config)
|
||||
|
||||
def signedOn(self):
|
||||
self.join(self.channel)
|
||||
print "Signed on as %s." % (self.nickname,)
|
||||
|
||||
def joined(self, channel):
|
||||
print "Joined %s." % (channel,)
|
||||
|
||||
def privmsg(self, user, channel, msg):
|
||||
user = user.split('!')[0]
|
||||
if channel == self.nickname:
|
||||
# Private /msg from a user
|
||||
self.do_command(msg, user)
|
||||
else:
|
||||
# log it
|
||||
self.log_event(user, msg)
|
||||
|
||||
args = string.split(msg, ":", 1)
|
||||
if len(args) > 1 and args[0] == self.nickname:
|
||||
self.do_command(string.strip(args[1]))
|
||||
else:
|
||||
# parse it for links, add URLs to the list
|
||||
words = msg.split(" ")
|
||||
for w in words:
|
||||
|
||||
# URL
|
||||
if w.startswith('http://') or w.startswith('https://'):
|
||||
title = self.save_url(user, w)
|
||||
if title != False:
|
||||
self.say_public("URL added. %s" % title)
|
||||
|
||||
# Moodle tracker bugs
|
||||
elif w.startswith('MDL-'):
|
||||
m = re.search('^MDL-([0-9]+)$', w)
|
||||
if m:
|
||||
trackerurl = 'https://tracker.moodle.org/browse/%s' % w
|
||||
title = self.save_url(user, trackerurl)
|
||||
if title != False:
|
||||
self.say_public("%s - %s" % (trackerurl, title))
|
||||
|
||||
# Work requests
|
||||
elif w.startswith('WR'):
|
||||
m = re.search('^WR#?([0-9]+)$', w)
|
||||
if m:
|
||||
wrmsurl = 'https://wrms.catalyst.net.nz/%s' % m.groups()[0]
|
||||
title = self.save_url(user, wrmsurl)
|
||||
if title != False:
|
||||
self.say_public("%s - %s" % (wrmsurl, title))
|
||||
|
||||
# Bugzilla
|
||||
elif w.startswith('BUG'):
|
||||
m = re.search('^BUG#?([0-9]+)$', w)
|
||||
if m:
|
||||
wrmsurl = 'http://bugzilla.catalyst.net.nz/show_bug.cgi?id=%s' % m.groups()[0]
|
||||
title = self.save_url(user, wrmsurl)
|
||||
if title != False:
|
||||
self.say_public("%s - %s" % (wrmsurl, title))
|
||||
|
||||
def say_public(self, text):
|
||||
"Print TEXT into public channel, for all to see."
|
||||
self.notice(self.channel, text)
|
||||
self.log_event(self.nickname, text)
|
||||
|
||||
def say_private(self, nick, text):
|
||||
"Send private message of TEXT to NICK."
|
||||
self.notice(nick, text)
|
||||
|
||||
def reply(self, text, to_private=None):
|
||||
"Send TEXT to either public channel or TO_PRIVATE nick (if defined)."
|
||||
if to_private is not None:
|
||||
self.say_private(to_private, text)
|
||||
else:
|
||||
self.say_public(text)
|
||||
|
||||
def do_command(self, cmd, target=None):
|
||||
"""
|
||||
This is the function called whenever someone sends a public or
|
||||
private message addressed to the bot. (e.g. "bot: blah").
|
||||
"""
|
||||
|
||||
try:
|
||||
if cmd == 'help':
|
||||
self.reply(self.helptext, target)
|
||||
|
||||
elif cmd == 'urls' or cmd == 'list':
|
||||
db = self.get_session()
|
||||
for url in db.query(Url).order_by(Url.timestamp.desc())[:10]:
|
||||
line = "%s %s" % (url.url, url.title)
|
||||
self.reply(line, target)
|
||||
time.sleep(1)
|
||||
|
||||
elif cmd.startswith('urls ') or cmd.startswith('list '):
|
||||
db = self.get_session()
|
||||
(listcmd, n) = cmd.split(" ", 1)
|
||||
n = n.strip()
|
||||
if n == "all":
|
||||
rows = db.query(Url).order_by(Url.timestamp.desc())
|
||||
elif n.find("-") > 0:
|
||||
(x, y) = n.split("-", 1)
|
||||
try:
|
||||
x = abs(int(x))
|
||||
y = abs(int(y))
|
||||
if y < x:
|
||||
x, y = y, x
|
||||
except ValueError, ex:
|
||||
self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target)
|
||||
raise ex
|
||||
rows = db.query(Url).order_by(Url.timestamp.desc())[x-1:y]
|
||||
else:
|
||||
try:
|
||||
n = abs(int(n))
|
||||
except ValueError, ex:
|
||||
self.reply("Give me a number or a range of numbers, e.g. list 5 or list 11-20", target)
|
||||
raise ex
|
||||
rows = db.query(Url).order_by(Url.timestamp.desc())[:n]
|
||||
|
||||
for url in rows:
|
||||
line = "%s %s" % (url.url, url.title)
|
||||
self.reply(line, target)
|
||||
time.sleep(1)
|
||||
|
||||
elif cmd.startswith('http:') or cmd.startswith('https:'):
|
||||
title = self.save_url(from_private, cmd)
|
||||
if title == False:
|
||||
self.say_public("Sorry, I'm useless at UTF-8.")
|
||||
else:
|
||||
self.reply('URL added. %s' % title, target)
|
||||
|
||||
else:
|
||||
self.reply("I'm a bot of little brain. Try help for more information.", target)
|
||||
|
||||
except Exception, ex:
|
||||
print "Exception caught processing command: %s" % ex
|
||||
print " command was '%s' from %s" % (cmd, target)
|
||||
self.reply("Sorry, I didn't understand: %s" % cmd, target)
|
||||
self.reply(self.helptext, target)
|
||||
|
||||
class LolBotFactory(protocol.ClientFactory):
|
||||
protocol = LolBot
|
||||
|
||||
def __init__(self, config_path):
|
||||
self.config = get_config(config_path)
|
||||
self.server = self.config['irc.server']
|
||||
self.port = self.config['irc.port']
|
||||
self.nickname = self.config['irc.nickname']
|
||||
self.channel = self.config['irc.channel']
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
print "Lost connection (%s), reconnecting." % (reason,)
|
||||
connector.connect()
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
print "Could not connect: %s" % (reason,)
|
||||
|
||||
def get_options():
|
||||
try:
|
||||
(options, args) = getopt.getopt(sys.argv[1:], "hc:", ["help", "config=", ])
|
||||
except getopt.GetoptError, err:
|
||||
print str(err)
|
||||
usage()
|
||||
sys.exit(2)
|
||||
|
||||
config_path = ""
|
||||
for option,value in options:
|
||||
if option in ("-h", "--help"):
|
||||
usage()
|
||||
sys.exit(2)
|
||||
if option in ("-c", "--config"):
|
||||
config_path = value
|
||||
return { 'config_path': config_path }
|
||||
|
||||
def get_config(config_path):
|
||||
"""
|
||||
This method loads configuration options from a lolbot.conf file. The file
|
||||
should look like this:
|
||||
|
||||
irc.server = irc.freenode.net
|
||||
irc.port = 6667
|
||||
irc.channel = #lolbottest
|
||||
irc.nickname = lolbot
|
||||
db.url = sqlite:///lolbot.db
|
||||
|
||||
"""
|
||||
|
||||
if not config_path:
|
||||
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lolbot.conf")
|
||||
if not os.path.exists(config_path):
|
||||
print "Error: configuration file not found. By default lolbot will look for a lolbot.conf file in the same directory as the lolbot script, but you can override this by specifying a path on the command line with the --config option."
|
||||
usage()
|
||||
sys.exit(1)
|
||||
|
||||
# open the configuration file and grab all param=value declarations.
|
||||
config = {}
|
||||
with open(config_path) as f:
|
||||
for line in f:
|
||||
# skip comments
|
||||
if line.strip().startswith("#") or line.strip() == "":
|
||||
continue
|
||||
|
||||
# collect up param = value
|
||||
try:
|
||||
(param, value) = line.strip().split("=", 1)
|
||||
if param.strip() != "":
|
||||
config[param.strip()] = value.strip()
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# validate IRC host
|
||||
if "irc.server" not in config.keys():
|
||||
print "Error: the IRC server was not specified. Use --help for more information."
|
||||
sys.exit(1)
|
||||
|
||||
# validate IRC port
|
||||
if "irc.port" not in config.keys():
|
||||
config["irc.port"] = "6667"
|
||||
try:
|
||||
config["irc.port"] = int(config["irc.port"])
|
||||
except ValueError:
|
||||
print "Error: the IRC port must be an integer. If not specified, lolbot will use the default IRC port value 6667. Use --help for more information."
|
||||
sys.exit(1)
|
||||
|
||||
# validate IRC channel
|
||||
if "irc.channel" not in config.keys() or not config["irc.channel"].startswith("#"):
|
||||
print "Error: the IRC channel is not specified or incorrect. It must begin with a # - e.g. #mychatchannel. Use --help for more information."
|
||||
sys.exit(1)
|
||||
|
||||
# validate bot nickname
|
||||
if "irc.nickname" not in config.keys():
|
||||
config["irc.nickname"] = "lolbot"
|
||||
|
||||
return config
|
||||
|
||||
def usage():
|
||||
print """Run a lolbot.
|
||||
|
||||
-h, --help
|
||||
This message.
|
||||
|
||||
-c, --config=<filename>
|
||||
Specify a configuration file. Defaults to lolbot.conf in the same
|
||||
directory as the script.
|
||||
|
||||
Configuration:
|
||||
|
||||
irc.server = <host>
|
||||
The IRC server, e.g. irc.freenode.net
|
||||
|
||||
irc.port = <port>
|
||||
The IRC server port. Default: 6667
|
||||
|
||||
irc.channel = <channel>
|
||||
The chat channel to join, e.g. #trainspotting
|
||||
|
||||
irc.nickname = <nickname>
|
||||
The nickname for your lolbot. Default: "lolbot"
|
||||
|
||||
db.url = <url>
|
||||
The URL to your lolbot database. This can be any valid URL accepted
|
||||
by SQL Alchemy. If not specified, lolbot will attempt to use a
|
||||
SQLite database called lolbot.db in the same directory as the
|
||||
script.
|
||||
"""
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = get_options()
|
||||
config = get_config(args['config_path'])
|
||||
try:
|
||||
reactor.connectTCP(config['irc.server'], config['irc.port'], LolBotFactory(args['config_path']))
|
||||
reactor.run()
|
||||
except KeyboardInterrupt:
|
||||
print "Shutting down."
|
||||
|
||||
Loading…
Add table
Reference in a new issue