# Quoins - A TurboGears blogging system. # Copyright (C) 2008-2009 James E. Blair # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from tg import controllers, expose, flash, require, validate, request, redirect, session from tg import TGController import tg import pylons from repoze.what import predicates import openid.consumer.consumer import openid.server.server import openid.extensions.sreg from openid.store.sqlstore import MySQLStore import MySQLdb import sqlalchemy.engine.url import types import os.path from utils import get_oid_connection from model import * class DecideController(object): def __init__(self, oid_controller): self.oid_controller = oid_controller @expose(template="genshi:quoinstemplates.oid_confirm") @require(predicates.not_anonymous()) def index(self, **kw): oid_request = request.environ['oid_request'] self.oid_controller._validate_identity(oid_request.identity) sreg_req = openid.extensions.sreg.SRegRequest.fromOpenIDRequest(oid_request) user = request.identity['user'] sr_required = [self.oid_controller.getSRegValue(user,x) for x in sreg_req.required] sr_optional = [self.oid_controller.getSRegValue(user,x) for x in sreg_req.optional] session['oid_request']=oid_request session.save() d = dict(openid = self.oid_controller, oid_return_verified = request.environ['oid_return_verified'], oid_request = oid_request, required = sr_required, optional = sr_optional, ) if self.oid_controller.quoins: blog = DBSession.query(Blog).get(1) d['blog'] = blog d['quoins'] = self.oid_controller.quoins return d class ResponseController(object): def __init__(self, oid_controller): self.oid_controller = oid_controller @expose() def index(self, **kw): webresponse = request.environ['oid_webresponse'] pylons.response.status = webresponse.code pylons.response.headers.update(webresponse.headers) return webresponse.body class OpenIDController(TGController): sreg_friendly = { 'nickname': 'Nickname', 'email': 'Email address', 'fullname': 'Full name', 'dob': 'Date of birth', 'gender': 'Gender', 'postcode': 'Postal code', 'country': 'Country', 'language': 'Language', 'timezone': 'Time zone', } def __init__(self, *args, **kw): self.get_name_from_id = kw.pop('get_name_from_id', lambda x: x) self.path = kw.pop('path', '/') self.quoins = kw.pop('quoins', None) super(OpenIDController, self).__init__(*args, **kw) def url(self, obj=None): if obj is None: u = tg.url(self.path) elif isinstance(obj, basestring): if obj.startswith('/'): obj = obj[1:] u = tg.url(os.path.join(self.path, obj)) return u def absolute_url(self, obj): port = tg.config.get('server.webport', '') if port == '80': port = '' else: port = ':'+port return 'http://%s%s%s'%(tg.config.get('server.webhost'), port, self.url(obj)) def getSRegValue(self, user, field): val = None if field=='nickname': val = user.user_name if field=='fullname': val = user.display_name if field=='email': val = user.email_address return (field, self.sreg_friendly[field], val) def isAuthorized(self, id, trust_root): if not request.identity: return False name = self.get_name_from_id(id) if request.identity['user'].user_name == name: # check to see if the user has set preferences for this trust # root, and return True. also, sreg. XXX return False return False def _validate_identity(self, id): name = self.get_name_from_id(id) if request.identity['user'].user_name != name: raise Exception("Not your ID") @expose() def finish(self, **kw): oid_request=session['oid_request'] self._validate_identity(oid_request.identity) sreg_req = openid.extensions.sreg.SRegRequest.fromOpenIDRequest(oid_request) user = request.identity['user'] store = MySQLStore(get_oid_connection()) oserver = openid.server.server.Server(store, self.absolute_url('/server')) data = {} for field in sreg_req.required+sreg_req.optional: if kw.has_key(field): data[field]=self.getSRegValue(user, field)[2] sreg_resp = openid.extensions.sreg.SRegResponse.extractResponse(sreg_req, data) if kw.get('continue', None): response = oid_request.answer(True) response.addExtension(sreg_resp) else: response = oid_request.answer(False) webresponse = oserver.encodeResponse(response) pylons.response.status = webresponse.code pylons.response.headers.update(webresponse.headers) del session['oid_request'] session.save() return webresponse.body @expose() def lookup(self, *remainder): store = MySQLStore(get_oid_connection()) oserver = openid.server.server.Server(store, self.absolute_url('/server')) oid_request = oserver.decodeRequest(request.params) if not oid_request: raise Exception("This does not appear to be an OpenID request") request.environ['oid_request']=oid_request request.environ['oid_return_verified']='not verified' if oid_request.mode in ['checkid_immediate', 'checkid_setup']: if hasattr(oid_request, 'returnToVerified'): if not oid_request.trustRootValid(): raise Exception("Not trusted") try: if not oid_request.returnToVerified(): raise Exception("Not trusted") request.environ['oid_return_verified']='verified' except: pass else: if not (oid_request.trustRootValid()): raise Exception("Not trusted") if self.isAuthorized(oid_request.identity, oid_request.trust_root): response = oid_request.answer(True) elif oid_request.immediate: response = oid_request.answer(False) else: return (DecideController(self), ()) else: response = oserver.handleRequest(oid_request) webresponse = oserver.encodeResponse(response) request.environ['oid_webresponse']=webresponse return (ResponseController(self), ())