# Quoins - A TurboGears blogging system. # Copyright (C) 2008-2009 James E. Blair # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from sqlalchemy import * from sqlalchemy.orm import mapper, relation from sqlalchemy import Table, ForeignKey, Column from sqlalchemy.types import Integer, Unicode #from sqlalchemy.orm import relation, backref import tg from datetime import datetime #from bonglonglong.model import DeclarativeBase, metadata, DBSession metadata = tg.config['model'].metadata DBSession = tg.config['model'].DBSession TGUser = tg.config['sa_auth']['user_class'] tguser_table = TGUser.__table__ # Blog schema blog_table = Table('blog', metadata, Column('id', Integer, primary_key=True), Column('title', Unicode(255)), Column('subtitle', Unicode(255)), Column('allow_comments', Boolean, default=True, nullable=False), ) post_table = Table('post', metadata, Column('id', Integer, primary_key=True), Column('blog_id', Integer, ForeignKey('blog.id', onupdate="CASCADE", ondelete="CASCADE"), nullable=False, index=True), Column('user_id', Integer, ForeignKey(tguser_table.c.user_id, onupdate="CASCADE", ondelete="CASCADE"), nullable=False, index=True), Column('title', Unicode(255)), Column('teaser', TEXT), Column('body', TEXT), Column('created', DateTime, nullable=False, default=datetime.now), Column('allow_comments', Boolean, nullable=False), Column('published', Boolean, nullable=False, default=False, index=True), ) media_table = Table('media', metadata, Column('id', Integer, primary_key=True), Column('post_id', Integer, ForeignKey('post.id', onupdate="CASCADE", ondelete="CASCADE"), nullable=False, index=True), Column('name', String(255), nullable=False, index=True), Column('mimetype', String(255)), Column('data', BLOB), UniqueConstraint('post_id', 'name'), ) tag_table = Table('tag', metadata, Column('id', Integer, primary_key=True), Column('name', Unicode(255)), ) post_tag_table = Table('post_tag', metadata, Column('id', Integer, primary_key=True), Column('post_id', Integer, ForeignKey('post.id', onupdate="CASCADE", ondelete="CASCADE"), nullable=False, index=True), Column('tag_id', Integer, ForeignKey('tag.id', onupdate="CASCADE", ondelete="CASCADE"), nullable=False, index=True), ) comment_table = Table('comment', metadata, Column('id', Integer, primary_key=True), Column('parent_comment_id', Integer, ForeignKey('comment.id', onupdate="CASCADE", ondelete="CASCADE"), index=True), Column('post_id', Integer, ForeignKey('post.id', onupdate="CASCADE", ondelete="CASCADE"), nullable=False, index=True), Column('user_id', Integer, ForeignKey(tguser_table.c.user_id, onupdate="CASCADE"), index=True), Column('name', Unicode(255)), Column('openid', String(255)), Column('url', String(255)), Column('title', Unicode(255)), Column('body', TEXT), Column('created', DateTime, nullable=False, default=datetime.now), Column('approved', Boolean, nullable=False, default=False, index=True), ) linkback_table = Table('linkback', metadata, Column('id', Integer, primary_key=True), Column('post_id', Integer, ForeignKey('post.id', onupdate="CASCADE", ondelete="CASCADE"), nullable=False, index=True), Column('url', String(255)), Column('title', Unicode(255)), Column('body', Unicode(255)), Column('name', Unicode(255)), Column('created', DateTime, nullable=False, default=datetime.now), ) class Blog(object): def get_tags(self): # XXX this assumes that only one blog exists in the schema return DBSession.query(Tag).all() tags = property(get_tags) def get_users(self): return [user for user in TGUser.query.all() if 'blog-post' in [p.permission_name for p in user.permissions]] authors = property(get_users) def getPostsByTag(self, tagname): posts = Post.query.filter(and_(post_table.c.blog_id==self.id, post_table.c.id==post_tag_table.c.post_id, post_tag_table.c.tag_id==tag_table.c.id, tag_table.c.name==tagname, post_table.c.published==True)).all() return posts def getYears(self): years = {} for p in self.published_posts: x = years.get(p.created.year, 0) years[p.created.year] = x+1 years = years.items() years.sort(lambda a,b: cmp(a[0],b[0])) years.reverse() return years def getPostsByDate(self, year=None, month=None, day=None): posts = [] for p in self.published_posts: if year and p.created.year!=year: continue if month and p.created.month!=month: continue if day and p.created.day!=day: continue posts.append(p) return posts def getPostsByAuthor(self, name): posts = [] for p in self.published_posts: if p.author.user_name==name: posts.append(p) return posts class Post(object): def get_teaser_or_body(self): if self.teaser: return self.teaser return self.body short_body = property(get_teaser_or_body) def get_teaser_and_body(self): if self.teaser: return self.teaser + self.body return self.body long_body = property(get_teaser_and_body) def tag(self, name): t = Tag.query.filter_by(name=name).first() if not t: t = Tag() t.name = name self.tags.append(t) def untag(self, name): t = Tag.query.filter_by(name=name).first() if len(t.posts)<2: session.delete(t) self.tags.remove(t) def get_comments_and_linkbacks(self, trackbacks=1, pingbacks=0): objects = self.approved_comments[:] for x in self.linkbacks: if (trackbacks and x.body) or (pingbacks and not x.body): objects.append(x) objects.sort(lambda a,b: cmp(a.created, b.created)) return objects comments_and_links = property(get_comments_and_linkbacks) class Media(object): pass class Tag(object): pass class BaseComment(object): def get_author_name(self): if hasattr(self, 'author') and self.author: return self.author.display_name if self.name: return self.name return 'Anonymous' author_name = property(get_author_name) def get_author_url(self): if hasattr(self, 'author') and self.author: return self.author.url if self.url: return self.url return None author_url = property(get_author_url) class Comment(BaseComment): pass class LinkBack(BaseComment): pass mapper(Blog, blog_table, properties=dict(posts=relation(Post, order_by=desc(post_table.c.created)), published_posts=relation(Post, primaryjoin=and_(post_table.c.blog_id==blog_table.c.id, post_table.c.published==True), order_by=desc(post_table.c.created)), unpublished_posts=relation(Post, primaryjoin=and_(post_table.c.blog_id==blog_table.c.id, post_table.c.published==False), order_by=desc(post_table.c.created)), unapproved_comments=relation(Comment, secondary=post_table, primaryjoin=post_table.c.blog_id==blog_table.c.id, secondaryjoin=and_(comment_table.c.post_id==post_table.c.id, comment_table.c.approved==False)))) mapper(Tag, tag_table, order_by=desc(tag_table.c.name)) mapper(Media, media_table, properties=dict(post=relation(Post))) mapper(Post, post_table, order_by=desc(post_table.c.created), properties=dict(blog=relation(Blog), author=relation(TGUser), tags=relation(Tag, secondary=post_tag_table, backref='posts'), comments=relation(Comment, cascade="all, delete-orphan"), approved_comments=relation(Comment, primaryjoin=and_(comment_table.c.post_id==post_table.c.id, comment_table.c.approved==True)), unapproved_comments=relation(Comment, primaryjoin=and_(comment_table.c.post_id==post_table.c.id, comment_table.c.approved==False)), media=relation(Media), linkbacks=relation(LinkBack))) mapper(Comment, comment_table, order_by=comment_table.c.created, properties=dict(post=relation(Post), author=relation(TGUser))) mapper(LinkBack, linkback_table, properties=dict(post=relation(Post))) def init_model(engine): """Call me before using any of the tables or classes in the model.""" maker = sessionmaker(autoflush=True, autocommit=False, extension=ZopeTransactionExtension()) DBSession = scoped_session(maker) DeclarativeBase = declarative_base() metadata = DeclarativeBase.metadata DBSession.configure(bind=engine) return DBSession