# Quoins - A TurboGears blogging system. # Copyright (C) 2008-2009 James E. Blair # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from tg import controllers, expose, flash, require, validate, request, redirect, session from tg import TGController from pylons.controllers.util import abort from repoze.what import predicates from tw.api import WidgetsList import tw.forms as forms import tw.forms.fields as fields import tw.forms.validators as validators from webhelpers.feedgenerator import Atom1Feed, Rss201rev2Feed from model import * import pylons import cgi from genshi.input import HTML from genshi.filters import HTMLSanitizer from urlparse import urlsplit import os.path from tw.tinymce import TinyMCE import smtplib from email.mime.text import MIMEText from threading import Thread import logging log = logging.getLogger('quoins') import openid.consumer.consumer import openid.server.server import openid.extensions.sreg from openid.store.sqlstore import MySQLStore import types import xmlrpclib, sys, re from linkback import LinkBackHandler, PingBackURI, TrackBackURI import base64 import urllib import openid_controllers from utils import get_oid_connection def b64encode(x): return base64.encodestring(x)[:-1] def fix_url(url): parts = urlsplit(url) if not parts[0]: url = 'http://'+url i = url.find('://') if '/' not in url[i+3:]: url = url + '/' return url def send_email(msg, frm, to): host = tg.config.get('quoins.mailserver', None) port = tg.config.get('quoins.mailport', 0) helo = tg.config.get('quoins.mailhelo', None) user = tg.config.get('quoins.mailuser', None) pswd = tg.config.get('quoins.mailpass', None) if not host: log.warn('Quoins email notifications are not configured') return #XXX log not sending mail s = smtplib.SMTP(host, port, helo) if user and pswd: s.login(user, pswd) s.sendmail(frm, [to], msg.as_string()) s.close() log.info('Sent mail to: %s' % to) class QuoinsName(validators.FancyValidator): messages = { 'percent': 'Names with %% are not permitted', 'in_use': 'This name is in use', 'anonymous': 'The name anonymous is not permitted', 'openid': 'Names beginning with "OpenID" are not permitted', } def _to_python(self, value, state): # Leading or trailing whitespace in a name is not interesting. return value.strip() def validate_python(self, value, state): if not value: return None if '%' in value: raise validators.Invalid(self.message("percent", state), value, state) if DBSession.query(TGUser).filter_by(display_name=value).first(): raise validators.Invalid(self.message("in_use", state), value, state) if value.lower()=='anonymous': raise validators.Invalid(self.message("anonymous", state), value, state) if value.lower().startswith('openid'): raise validators.Invalid(self.message("openid", state), value, state) class SimpleForm(forms.Form): template = """
""" class BlogPostForm(SimpleForm): submit_text = "Post" class fields(WidgetsList): post_id = fields.HiddenField() title = fields.TextField(validator=validators.NotEmpty(), attrs=dict(size=60)) tags = fields.TextField(attrs=dict(size=30)) comments = fields.CheckBox(label='Allow comments') body = TinyMCE(validator=validators.NotEmpty(), new_options = dict( plugins = "media", convert_urls = False )) ## plugins = "pagebreak", ## pagebreak_separator = "", ## )) file = fields.FileField(label='Upload image') save = fields.SubmitButton(label=' ', default='Upload or save draft', validator=validators.FancyValidator(if_missing=''), named_button=True) blog_post_form = BlogPostForm() class OpenIDField(fields.TextField): template = """ """ params = ["attrs", "img_url"] params_doc = {'attrs' : 'Dictionary containing extra (X)HTML attributes for' ' the input tag', 'img_url' : 'The URL for the OpenID image',} attrs = {} img_url = lambda x: tg.url('/images/openid_small_logo.png') class BlogCommentForm(SimpleForm): submit_text = "Comment" class fields(WidgetsList): id = fields.HiddenField() name = fields.TextField(validator=QuoinsName()) url = OpenIDField(help_text='Enter your website or your OpenID here.') body = fields.TextArea(validator=validators.NotEmpty()) blog_comment_form = BlogCommentForm() class Feed(TGController): def get_feed_data(self, **kwargs): # get the latest five blog entries in reversed order from SQLobject blog = DBSession.query(Blog).get(1) entries = [] if kwargs.has_key('author'): posts = blog.getPostsByAuthor(kwargs['author'])[:5] else: posts = blog.published_posts[:5] for post in posts: e = {} e["title"] = post.title e["published"] = post.created e["author"] = post.author.display_name e["email"] = post.author.email_address e["link"] = self.blog_controller.absolute_url(post) e["summary"] = post.short_body tags = [tag.name for tag in post.tags] if tags: e["categories"] = tags else: e["categories"] = None entries.append(e) if blog.subtitle: subtitle = blog.subtitle else: subtitle = '' bloginfo = dict( title = blog.title, subtitle = subtitle, link = self.blog_controller.absolute_url(), description = u'The latest entries from %s' % blog.title, id = self.blog_controller.absolute_url(), entries = entries ) return bloginfo @expose(content_type='application/atom+xml') def atom1_0(self, **kw): # fix for http://trac.turbogears.org/ticket/2351 if request.headers.get('Accept', '') == 'text/html': abort(500, "This page returns application/atom+xml, please set your Accept header appropriately.") info = self.get_feed_data(**kw) feed = Atom1Feed( title=info['title'], link=info['link'], description=info['description'], language=u"en", ) for entry in info['entries']: feed.add_item(title=entry['title'], link=entry['link'], description=entry['summary'], author_name=entry['author'], author_email=entry['email'], pubdate=entry['published'], categories=entry['categories']) return feed.writeString('utf-8') @expose(content_type='application/rss+xml') def rss2_0(self, **kw): # fix for http://trac.turbogears.org/ticket/2351 if request.headers.get('Accept', '') == 'text/html': abort(500, "This page returns application/rss+xml, please set your Accept header appropriately.") info = self.get_feed_data(**kw) feed = Rss201rev2Feed( title=info['title'], link=info['link'], description=info['description'], language=u"en", ) for entry in info['entries']: feed.add_item(title=entry['title'], link=entry['link'], description=entry['summary'], author_name=entry['author'], author_email=entry['email'], pubdate=entry['published'], categories=entry['categories']) return feed.writeString('utf-8') class Pingback(TGController): # The index method is based on code at: # http://www.dalkescientific.com/writings/diary/archive/2006/10/24/xmlrpc_in_turbogears.html @expose(content_type='text/xml') def index(self): # fix for http://trac.turbogears.org/ticket/2351 if request.headers.get('Accept', '') == 'text/html': abort(500, "This page returns text/xml, please set your Accept header appropriately.") try: params, method = xmlrpclib.loads(request.body) log.debug('Pingback method: %s' % method) if method != "pingback.ping": raise AssertionError("method does not exist") # Call the method, convert it into a 1-element tuple # as expected by dumps response = self.ping(*params) response = xmlrpclib.dumps((response,), methodresponse=1) except xmlrpclib.Fault, fault: # Can't marshal the result response = xmlrpclib.dumps(fault) except: # Some other error; send back some error info response = xmlrpclib.dumps( xmlrpclib.Fault(0, "%s:%s" % (sys.exc_type, sys.exc_value)) ) log.info('Pingback response: %s' % response) return response post_re = re.compile(r'^.*/post/(\d+)$') def ping(self, sourceURI, targetURI): m = self.post_re.match(targetURI) if not m: return xmlrpclib.Fault(0x21, 'Unable to parse targetURI.') id = int(m.group(1)) post = DBSession.query(Post).get(id) if not post: return xmlrpclib.Fault(0x20, 'Post not found.') elif not post.allow_comments: return xmlrpclib.Fault(0x31, 'Comments are closed on this post.') for lb in post.comments: if lb.url and lb.url == sourceURI: return xmlrpclib.Fault(0x30, 'Pingback already registered.') lb = Comment() lb.approved = True lb.type = 'pingback' lb.post = post lb.url = sourceURI post.comments.append(lb) return 'Linkback recorded.' def post_paginate(start, posts, size): start=int(start) if start < 0: start = 0 if start > len(posts): start = len(posts)-size out_posts = posts[start:start+size] next = prev = None if len(posts)>start+size: next = start+size if start: prev = start-size if prev < 0: prev = 0 return dict(posts = out_posts, prev = prev, next = next) ID_RE = re.compile(r'.*?/author/([^/]*)') class BlogController(TGController): feed = Feed() pingback = Pingback() def __init__(self, *args, **kw): self.path = kw.pop('path', '/') self.post_paginate = kw.pop('paginate', 5) get_name_from_id = kw.pop('get_name_from_id', self.get_username_from_openid) self.openid = openid_controllers.OpenIDController(path=os.path.join(self.path, 'openid/'), quoins=self, get_name_from_id = get_name_from_id) self.feed.blog_controller = self super(BlogController, self).__init__(*args, **kw) def get_username_from_openid(self, id): m = ID_RE.match(id) return m.group(1) def url(self, obj=None): if obj is None: u = tg.url(self.path) elif isinstance(obj, basestring): try: obj.encode('ascii') except: obj = urllib.quote(obj.encode('utf8')) if obj.startswith('/'): obj = obj[1:] u = tg.url(os.path.join(self.path, obj)) elif isinstance(obj, Post): u = tg.url(os.path.join(self.path, 'post', str(obj.id))) elif isinstance(obj, Media): u = tg.url(os.path.join(self.path, 'media', str(obj.post.id), str(obj.name))) elif isinstance(obj, TGUser): u = tg.url(os.path.join(self.path, 'author', str(obj.user_name))) return u def absolute_url(self, obj=None): u = self.url(obj) port = tg.config.get('server.webport') if port == '80': port = '' else: port = ':'+port return 'http://%s%s%s'%(tg.config.get('server.webhost'), port, u) def comment_author_url(self, comment): if (hasattr(comment, 'author') and comment.author and (not comment.url)): return self.url(comment.author) return comment.url def get_html(self, data): return HTML(data) def get_trackback_rdf(self, post, comment=True): rdf = """ \n""" rdf = rdf % (self.absolute_url('/'), self.absolute_url(post), post.title, self.absolute_url('/trackback/%s'%post.id)) if comment: rdf = "" return HTML(rdf) def send_comment_email(self, comment): post = comment.post blog = post.blog fromaddr = tg.config.get('quoins.mailfrom', '<>') toaddr = post.author.email_address d = {} d['blog_title'] = blog.title d['post_title'] = post.title d['name'] = comment.name d['url'] = comment.url d['comment'] = comment.body if comment.approved: d['approval'] = "Approval is not required for this comment." else: d['approval'] = "This comment is not yet approved. To approve it, visit:\n\n" d['approval'] += " %s" % self.absolute_url('/unapproved_comments') message = """ A new comment has been posted to the %(blog_title)s post "%(post_title)s". %(approval)s Name: %(name)s URL: %(url)s Comment: %(comment)s """ % d msg = MIMEText(message) msg['From'] = fromaddr msg['To'] = toaddr msg['Subject'] = "New comment on %s" % post.title t = Thread(target=send_email, args=(msg, fromaddr, toaddr)) t.start() @expose(template="genshi:quoinstemplates.index") def index(self, start=0): pylons.response.headers['X-XRDS-Location']=self.absolute_url('/yadis') blog = DBSession.query(Blog).get(1) d = post_paginate(start, blog.published_posts, self.post_paginate) d.update(dict(quoins = self, blog = blog, post = None, author = None, )) return d @expose(template="genshi:quoinstemplates.index") def tag(self, tagname, start=0): try: start=int(start) except: abort(404) blog = DBSession.query(Blog).get(1) posts = blog.getPostsByTag(tagname) d = post_paginate(start, posts, self.post_paginate) d.update(dict(quoins = self, blog = blog, post = None, author = None)) return d @expose(template="genshi:quoinstemplates.index") def archive(self, year='', month='', day='', start=0): try: year = int(year) except: year = None try: month = int(month) except: month = None try: day = int(day) except: day = None if not year: abort(404) blog = DBSession.query(Blog).get(1) posts = blog.getPostsByDate(year, month, day) d = post_paginate(start, posts, self.post_paginate) d.update(dict(quoins = self, blog = blog, post = None, author = None)) return d @expose(template="genshi:quoinstemplates.index") def author(self, name='', start=0): try: start=int(start) except: abort(404) if not name: abort(404) blog = DBSession.query(Blog).get(1) posts = blog.getPostsByAuthor(name) if not posts: abort(404) d = post_paginate(start, posts, self.post_paginate) d.update(dict(quoins = self, blog = blog, post = None, author = name)) return d @expose(template="genshi:quoinstemplates.index") @require(predicates.has_permission('blog-post')) def unpublished_posts(self, start=0): blog = DBSession.query(Blog).get(1) posts = blog.unpublished_posts d = post_paginate(start, posts, self.post_paginate) d.update(dict(quoins = self, blog = blog, post = None, author = None)) return d @expose(template="genshi:quoinstemplates.post") def post(self, id): post = DBSession.query(Post).get(id) if not post: abort(404) pylons.response.headers['X-Pingback']=self.absolute_url('/pingback/') return dict(quoins = self, blog = post.blog, post = post) @expose(template="genshi:quoinstemplates.new_comment") def new_comment(self, id): post = DBSession.query(Post).get(id) if not post: abort(404) if not post.allow_comments: flash('This post does not allow comments.') redirect(self.url(post)) return dict(quoins = self, blog = post.blog, post = post, form = blog_comment_form, action = self.url('save_comment'), defaults = {'id':id}) @expose() def yadis(self): doc = """ http://specs.openid.net/auth/2.0/return_to %s """ % self.absolute_url('oid_comment') return doc @expose() @validate(form=blog_comment_form, error_handler=new_comment) def save_comment(self, id, name='', url='', body=''): post = DBSession.query(Post).get(id) if not post.allow_comments: flash('This post does not allow comments.') redirect(self.url(post)) if not name: name = 'Anonymous' if url: store = MySQLStore(get_oid_connection()) con = openid.consumer.consumer.Consumer(session, store) url = fix_url(str(url)) try: req = con.begin(url) req.addExtensionArg('http://openid.net/sreg/1.0', 'optional', 'fullname,nickname,email') oid_url = req.redirectURL(self.absolute_url(), self.absolute_url('oid_comment')) session['oid_comment_body']=body session['oid_comment_name']=name session['oid_comment_post']=id session['oid_comment_url']=url session.save() redirect(oid_url) except openid.consumer.consumer.DiscoveryFailure: # treat as anonymous content without openid pass c = Comment() post.comments.append(c) c.post = post c.body = body if request.identity: c.author=request.identity['user'] c.approved = True flash('Your comment has been posted.') else: c.name = name if url: c.url = url flash('Your comment has been posted and is awaiting moderation.') self.send_comment_email(c) redirect(self.url(post)) @expose() def oid_comment(self, **kw): store = MySQLStore(get_oid_connection()) con = openid.consumer.consumer.Consumer(session, store) port = tg.config.get('server.webport') if port == '80': port = '' else: port = ':'+port path = 'http://%s%s%s'%(tg.config.get('server.webhost'), port, request.path) ret = con.complete(kw, path) session.save() if ret.status == openid.consumer.consumer.SUCCESS: sreg = ret.extensionResponse('http://openid.net/sreg/1.0', True) name = session['oid_comment_name'] if sreg.has_key('fullname'): name = sreg.get('fullname') elif sreg.has_key('nickname'): name = sreg.get('nickname') body = session['oid_comment_body'] id = session['oid_comment_post'] url = session['oid_comment_url'] name = unicode(name) post = DBSession.query(Post).get(id) if not post.allow_comments: flash('This post does not allow comments.') redirect(self.url(post)) if name and ('%' in name or DBSession.query(TGUser).filter_by(display_name=name).first() or name.lower()=='anonymous'): name = u'OpenID: %s'%name c = Comment() c.post = post post.comments.append(c) c.approved = True c.body = body c.url = url c.openid = url c.name = name flash('Your comment has been posted.') self.send_comment_email(c) redirect(self.url(post)) else: id = session['oid_comment_post'] post = DBSession.query(Post).get(id) flash('OpenID authentication failed') redirect(self.url('new_comment/%s'%post.id)) @expose(template="genshi:quoinstemplates.delete_comment") @require(predicates.has_permission('blog-post')) def delete_comment(self, id, confirm=None): comment = DBSession.query(Comment).get(id) post = comment.post if confirm: DBSession.delete(comment) flash('Comment deleted.') redirect(self.url(post)) return dict(quoins = self, blog = comment.post.blog, post = comment.post, comment = comment) @expose(template="genshi:quoinstemplates.unapproved_comments") @require(predicates.has_permission('blog-post')) def unapproved_comments(self): blog = DBSession.query(Blog).get(1) return dict(quoins = self, blog=blog, post=None, comments = blog.unapproved_comments) @expose() @require(predicates.has_permission('blog-post')) def approve_comments(self, **kwargs): for name, value in kwargs.items(): if not name.startswith('comment_'): continue if value == 'ignore': continue c, id = name.split('_') comment = DBSession.query(Comment).get(int(id)) if value == 'approve': comment.approved = True if value == 'delete': DBSession.delete(comment) flash('Your changes have been saved.') redirect(self.url()) @expose(content_type=controllers.CUSTOM_CONTENT_TYPE) def media(self, post_id, name): # Apparently TG2 is adopting Zope misfeatures and we # don't get .ext in our name argument. name = request.environ['PATH_INFO'].split('/')[-1] post = DBSession.query(Post).get(post_id) if not post: abort(404) media = None for m in post.media: if m.name==name: media = m if not media: abort(404) pylons.response.headers['Content-Type'] = media.mimetype return str(media.data) @expose() @require(predicates.has_permission('blog-post')) def delete_media(self, post_id, ids=[]): if type(ids) != type([]): ids = [ids] for id in ids: media = DBSession.query(Media).get(id) DBSession.delete(media) DBSession.flush() flash('Deleted image') redirect(self.url('edit_post/%s'%post_id)) @expose(template="genshi:quoinstemplates.new_post") @require(predicates.has_permission('blog-post')) def new_post(self, **args): blog = DBSession.query(Blog).get(1) return dict(quoins = self, blog = blog, post = None, form = blog_post_form, defaults = {'comments':blog.allow_comments}) @expose(template="genshi:quoinstemplates.delete_post") @require(predicates.has_permission('blog-post')) def delete_post(self, id, confirm=None): post = DBSession.query(Post).get(id) if confirm: for tag in post.tags[:]: post.untag(tag.name) DBSession.delete(post) flash('Post deleted.') redirect(self.url('/')) return dict(quoins = self, blog = post.blog, post = post) @expose(template="genshi:quoinstemplates.new_post") @require(predicates.has_permission('blog-post')) def edit_post(self, id): post = DBSession.query(Post).get(id) body = post.body if post.teaser: body = post.teaser + '\n----\n' + body tags = ' '.join([t.name for t in post.tags]) return dict(quoins = self, blog = post.blog, form = blog_post_form, post = post, defaults = {'comments':post.allow_comments, 'title':post.title, 'body':body, 'tags':tags, 'post_id':post.id}) @expose(template="genshi:quoinstemplates.save_post") @validate(form=blog_post_form, error_handler=new_post) @require(predicates.has_permission('blog-post')) def save_post(self, title, body, comments='', tags='', post_id='', file=None, save='', submit=''): flashes = [] body = body.encode('utf8') if not tags: tags = '' if post_id: # Editing a post post_id = int(post_id) p = DBSession.query(Post).get(post_id) blog = p.blog for t in p.tags[:]: p.untag(t.name) else: # Making a new post blog = DBSession.query(Blog).get(1) p = Post() p.author=request.identity['user'] p.blog=blog DBSession.add(p) p.title = title p.body = body teaser = '' newbody = '' body = body.replace('\r', '') for line in body.split('\n'): if line.strip()=='----' and not teaser: teaser = newbody.strip()+'\n' newbody = '' else: newbody += line+'\n' if teaser and newbody: p.teaser = teaser if teaser and not newbody: newbody = teaser p.body = newbody.strip()+'\n' if comments: p.allow_comments=True else: p.allow_comments=False for tag in tags.split(): tag = tag.lower().strip() p.tag(tag) if file is not None: data = file.file.read() if data: media = Media() media.post = p media.name = file.filename media.mimetype = file.type media.data = data DBSession.add(media) flashes.append('File uploaded.') if save: p.published = False else: p.published = True DBSession.flush() if p.published: handler = LinkBackHandler() uris = handler.findURIs(body) session['linkback_uris']=uris session.save() myuris = [] for (uri, title, lbs) in uris: best = None for lb in lbs: if isinstance(lb, TrackBackURI): best = lb lb.type = 'TrackBack' if isinstance(lb, PingBackURI): lb.type = 'PingBack' myuris.append((uri, b64encode(uri), title, lbs, best)) flashes.append("Published post.") else: flashes.append("Saved draft post.") flash('\n'.join(flashes)) if not p.published: return redirect('./edit_post/%s'%(p.id)) return dict(quoins=self, blog=blog, post=p, uris=myuris) @expose() @require(predicates.has_permission('blog-post')) def send_linkbacks(self, id, **kw): trackbacks = kw.pop('trackbacks', '') blog = DBSession.query(Blog).get(1) post = DBSession.query(Post).get(id) flashes = [] uris = session.pop('linkback_uris', []) for (uri, title, lbs) in uris: b64uri = b64encode(uri) action = kw.pop(b64uri, None) if not action: continue type, target = action.split(':',1) for lb in lbs: if (isinstance(lb, TrackBackURI) and type=='TrackBack' and target==lb.uri): msg = lb.send(post.title, post.short_body, self.absolute_url(post), blog.title) flashes.append(msg) if (isinstance(lb, PingBackURI) and type=='PingBack' and target==lb.uri): msg = lb.send(self.absolute_url(post), uri) flashes.append(msg) for uri in trackbacks.split('\n'): uri = uri.strip() if not uri: continue lb = TrackBackURI(uri) msg = lb.send(post.title, post.short_body, self.absolute_url(post), blog.title) flashes.append(msg) flash('\n'.join(flashes)) redirect (self.url()) @expose() def trackback(self, id, url='', title='', excerpt='', blog_name=''): message = '' post = DBSession.query(Post).get(id) if not post: message = 'Post not found.' elif not post.allow_comments: message = 'Comments are closed on this post.' for lb in post.comments: if lb.url and lb.url == url: message = 'Trackback already registered.' if not message: lb = Comment() lb.approved = True lb.type = 'trackback' lb.post = post lb.url = url lb.title = title lb.name = blog_name lb.body = excerpt post.comments.append(lb) if message: error = 1 message = "%s\n" % message else: error = 0 return """ %s %s """ % (error, message)