#!/usr/bin/env python __copyright__ = 'Copyright 2005-2008, Janrain, Inc.' from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from urlparse import urlparse import time import Cookie import cgi import cgitb import sys def quoteattr(s): qs = cgi.escape(s, 1) return '"%s"' % (qs,) try: import openid except ImportError: sys.stderr.write(""" Failed to import the OpenID library. In order to use this example, you must either install the library (see INSTALL in the root of the distribution) or else add the library to python's import path (the PYTHONPATH environment variable). For more information, see the README in the root of the library distribution.""") sys.exit(1) from openid.extensions import sreg from openid.server import server from openid.store.filestore import FileOpenIDStore from openid.consumer import discover class OpenIDHTTPServer(HTTPServer): """ http server that contains a reference to an OpenID Server and knows its base URL. """ def __init__(self, *args, **kwargs): HTTPServer.__init__(self, *args, **kwargs) if self.server_port != 80: self.base_url = ('http://%s:%s/' % (self.server_name, self.server_port)) else: self.base_url = 'http://%s/' % (self.server_name,) self.openid = None self.approved = {} self.lastCheckIDRequest = {} def setOpenIDServer(self, oidserver): self.openid = oidserver class ServerHandler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): self.user = None BaseHTTPRequestHandler.__init__(self, *args, **kwargs) def do_GET(self): try: self.parsed_uri = urlparse(self.path) self.query = {} for k, v in cgi.parse_qsl(self.parsed_uri[4]): self.query[k] = v self.setUser() path = self.parsed_uri[2].lower() if path == '/': self.showMainPage() elif path == '/openidserver': self.serverEndPoint(self.query) elif path == '/login': self.showLoginPage('/', '/') elif path == '/loginsubmit': self.doLogin() elif path.startswith('/id/'): self.showIdPage(path) elif path.startswith('/yadis/'): self.showYadis(path[7:]) elif path == '/serveryadis': self.showServerYadis() else: self.send_response(404) self.end_headers() except (KeyboardInterrupt, SystemExit): raise except: self.send_response(500) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(cgitb.html(sys.exc_info(), context=10)) def do_POST(self): try: self.parsed_uri = urlparse(self.path) self.setUser() content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) self.query = {} for k, v in cgi.parse_qsl(post_data): self.query[k] = v path = self.parsed_uri[2] if path == '/openidserver': self.serverEndPoint(self.query) elif path == '/allow': self.handleAllow(self.query) else: self.send_response(404) self.end_headers() except (KeyboardInterrupt, SystemExit): raise except: self.send_response(500) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(cgitb.html(sys.exc_info(), context=10)) def handleAllow(self, query): # pretend this next bit is keying off the user's session or something, # right? request = self.server.lastCheckIDRequest.get(self.user) if 'yes' in query: if 'login_as' in query: self.user = self.query['login_as'] if request.idSelect(): identity = self.server.base_url + 'id/' + query['identifier'] else: identity = request.identity trust_root = request.trust_root if self.query.get('remember', 'no') == 'yes': self.server.approved[(identity, trust_root)] = 'always' response = self.approved(request, identity) elif 'no' in query: response = request.answer(False) else: assert False, 'strange allow post. %r' % (query,) self.displayResponse(response) def setUser(self): cookies = self.headers.get('Cookie') if cookies: morsel = Cookie.BaseCookie(cookies).get('user') if morsel: self.user = morsel.value def isAuthorized(self, identity_url, trust_root): if self.user is None: return False if identity_url != self.server.base_url + 'id/' + self.user: return False key = (identity_url, trust_root) return self.server.approved.get(key) is not None def serverEndPoint(self, query): try: request = self.server.openid.decodeRequest(query) except server.ProtocolError, why: self.displayResponse(why) return if request is None: # Display text indicating that this is an endpoint. self.showAboutPage() return if request.mode in ["checkid_immediate", "checkid_setup"]: self.handleCheckIDRequest(request) else: response = self.server.openid.handleRequest(request) self.displayResponse(response) def addSRegResponse(self, request, response): sreg_req = sreg.SRegRequest.fromOpenIDRequest(request) # In a real application, this data would be user-specific, # and the user should be asked for permission to release # it. sreg_data = { 'nickname':self.user } sreg_resp = sreg.SRegResponse.extractResponse(sreg_req, sreg_data) response.addExtension(sreg_resp) def approved(self, request, identifier=None): response = request.answer(True, identity=identifier) self.addSRegResponse(request, response) return response def handleCheckIDRequest(self, request): is_authorized = self.isAuthorized(request.identity, request.trust_root) if is_authorized: response = self.approved(request) self.displayResponse(response) elif request.immediate: response = request.answer(False) self.displayResponse(response) else: self.server.lastCheckIDRequest[self.user] = request self.showDecidePage(request) def displayResponse(self, response): try: webresponse = self.server.openid.encodeResponse(response) except server.EncodingError, why: text = why.response.encodeToKVForm() self.showErrorPage('
%s
' % cgi.escape(text)) return self.send_response(webresponse.code) for header, value in webresponse.headers.iteritems(): self.send_header(header, value) self.writeUserHeader() self.end_headers() if webresponse.body: self.wfile.write(webresponse.body) def doLogin(self): if 'submit' in self.query: if 'user' in self.query: self.user = self.query['user'] else: self.user = None self.redirect(self.query['success_to']) elif 'cancel' in self.query: self.redirect(self.query['fail_to']) else: assert 0, 'strange login %r' % (self.query,) def redirect(self, url): self.send_response(302) self.send_header('Location', url) self.writeUserHeader() self.end_headers() def writeUserHeader(self): if self.user is None: t1970 = time.gmtime(0) expires = time.strftime( 'Expires=%a, %d-%b-%y %H:%M:%S GMT', t1970) self.send_header('Set-Cookie', 'user=;%s' % expires) else: self.send_header('Set-Cookie', 'user=%s' % self.user) def showAboutPage(self): endpoint_url = self.server.base_url + 'openidserver' def link(url): url_attr = quoteattr(url) url_text = cgi.escape(url) return '%s' % (url_attr, url_text) def term(url, text): return '
%s
%s
' % (link(url), text) resources = [ (self.server.base_url, "This example server's home page"), ('http://www.openidenabled.com/', 'An OpenID community Web site, home of this library'), ('http://www.openid.net/', 'the official OpenID Web site'), ] resource_markup = ''.join([term(url, text) for url, text in resources]) self.showPage(200, 'This is an OpenID server', msg="""\

%s is an OpenID server endpoint.

For more information about OpenID, see:

%s
""" % (link(endpoint_url), resource_markup,)) def showErrorPage(self, error_message): self.showPage(400, 'Error Processing Request', err='''\

%s

''' % error_message) def showDecidePage(self, request): id_url_base = self.server.base_url+'id/' # XXX: This may break if there are any synonyms for id_url_base, # such as referring to it by IP address or a CNAME. assert (request.identity.startswith(id_url_base) or request.idSelect()), repr((request.identity, id_url_base)) expected_user = request.identity[len(id_url_base):] if request.idSelect(): # We are being asked to select an ID msg = '''\

A site has asked for your identity. You may select an identifier by which you would like this site to know you. On a production site this would likely be a drop down list of pre-created accounts or have the facility to generate a random anonymous identifier.

''' fdata = { 'id_url_base': id_url_base, 'trust_root': request.trust_root, } form = '''\
Identity: %(id_url_base)s
Trust Root:%(trust_root)s

Allow this authentication to proceed?


'''%fdata elif expected_user == self.user: msg = '''\

A new site has asked to confirm your identity. If you approve, the site represented by the trust root below will be told that you control identity URL listed below. (If you are using a delegated identity, the site will take care of reversing the delegation on its own.)

''' fdata = { 'identity': request.identity, 'trust_root': request.trust_root, } form = '''\
Identity:%(identity)s
Trust Root:%(trust_root)s

Allow this authentication to proceed?


''' % fdata else: mdata = { 'expected_user': expected_user, 'user': self.user, } msg = '''\

A site has asked for an identity belonging to %(expected_user)s, but you are logged in as %(user)s. To log in as %(expected_user)s and approve the login request, hit OK below. The "Remember this decision" checkbox applies only to the trust root decision.

''' % mdata fdata = { 'identity': request.identity, 'trust_root': request.trust_root, 'expected_user': expected_user, } form = '''\
Identity:%(identity)s
Trust Root:%(trust_root)s

Allow this authentication to proceed?


''' % fdata self.showPage(200, 'Approve OpenID request?', msg=msg, form=form) def showIdPage(self, path): link_tag = '' %\ self.server.base_url yadis_loc_tag = ''%\ (self.server.base_url+'yadis/'+path[4:]) disco_tags = link_tag + yadis_loc_tag ident = self.server.base_url + path[1:] approved_trust_roots = [] for (aident, trust_root) in self.server.approved.keys(): if aident == ident: trs = '
  • %s
  • \n' % cgi.escape(trust_root) approved_trust_roots.append(trs) if approved_trust_roots: prepend = '

    Approved trust roots:

    \n\n') msg = ''.join(approved_trust_roots) else: msg = '' self.showPage(200, 'An Identity Page', head_extras=disco_tags, msg='''\

    This is an identity page for %s.

    %s ''' % (ident, msg)) def showYadis(self, user): self.send_response(200) self.send_header('Content-type', 'application/xrds+xml') self.end_headers() endpoint_url = self.server.base_url + 'openidserver' user_url = self.server.base_url + 'id/' + user self.wfile.write("""\ %s %s %s %s """%(discover.OPENID_2_0_TYPE, discover.OPENID_1_0_TYPE, endpoint_url, user_url)) def showServerYadis(self): self.send_response(200) self.send_header('Content-type', 'application/xrds+xml') self.end_headers() endpoint_url = self.server.base_url + 'openidserver' self.wfile.write("""\ %s %s """%(discover.OPENID_IDP_2_0_TYPE, endpoint_url,)) def showMainPage(self): yadis_tag = ''%\ (self.server.base_url + 'serveryadis') if self.user: openid_url = self.server.base_url + 'id/' + self.user user_message = """\

    You are logged in as %s. Your OpenID identity URL is %s. Enter that URL at an OpenID consumer to test this server.

    """ % (self.user, quoteattr(openid_url), openid_url) else: user_message = """\

    This server uses a cookie to remember who you are in order to simulate a standard Web user experience. You are not logged in.

    """ self.showPage(200, 'Main Page', head_extras = yadis_tag, msg='''\

    This is a simple OpenID server implemented using the Python OpenID library.

    %s

    To use this server with a consumer, the consumer must be able to fetch HTTP pages from this web server. If this computer is behind a firewall, you will not be able to use OpenID consumers outside of the firewall with it.

    The URL for this server is %s.

    ''' % (user_message, quoteattr(self.server.base_url), self.server.base_url)) def showLoginPage(self, success_to, fail_to): self.showPage(200, 'Login Page', form='''\

    Login

    You may log in with any name. This server does not use passwords because it is just a sample of how to use the OpenID library.

    ''' % (success_to, fail_to)) def showPage(self, response_code, title, head_extras='', msg=None, err=None, form=None): if self.user is None: user_link = 'not logged in.' else: user_link = 'logged in as %s.
    Log out' % \ (self.user, self.user) body = '' if err is not None: body += '''\
    %s
    ''' % err if msg is not None: body += '''\
    %s
    ''' % msg if form is not None: body += '''\
    %s
    ''' % form contents = { 'title': 'Python OpenID Server Example - ' + title, 'head_extras': head_extras, 'body': body, 'user_link': user_link, } self.send_response(response_code) self.writeUserHeader() self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(''' %(title)s %(head_extras)s %(body)s ''' % contents) def main(host, port, data_path): addr = (host, port) httpserver = OpenIDHTTPServer(addr, ServerHandler) # Instantiate OpenID consumer store and OpenID consumer. If you # were connecting to a database, you would create the database # connection and instantiate an appropriate store here. store = FileOpenIDStore(data_path) oidserver = server.Server(store, httpserver.base_url + 'openidserver') httpserver.setOpenIDServer(oidserver) print 'Server running at:' print httpserver.base_url httpserver.serve_forever() if __name__ == '__main__': host = 'localhost' data_path = 'sstore' port = 8000 try: import optparse except ImportError: pass # Use defaults (for Python 2.2) else: parser = optparse.OptionParser('Usage:\n %prog [options]') parser.add_option( '-d', '--data-path', dest='data_path', default=data_path, help='Data directory for storing OpenID consumer state. ' 'Defaults to "%default" in the current directory.') parser.add_option( '-p', '--port', dest='port', type='int', default=port, help='Port on which to listen for HTTP requests. ' 'Defaults to port %default.') parser.add_option( '-s', '--host', dest='host', default=host, help='Host on which to listen for HTTP requests. ' 'Also used for generating URLs. Defaults to %default.') options, args = parser.parse_args() if args: parser.error('Expected no arguments. Got %r' % args) host = options.host port = options.port data_path = options.data_path main(host, port, data_path)