# -*- test-case-name: openid.test.test_discover -*- """Functions to discover OpenID endpoints from identifiers. """ __all__ = [ 'DiscoveryFailure', 'OPENID_1_0_NS', 'OPENID_1_0_TYPE', 'OPENID_1_1_TYPE', 'OPENID_2_0_TYPE', 'OPENID_IDP_2_0_TYPE', 'OpenIDServiceEndpoint', 'discover', ] import urlparse from openid import oidutil, fetchers, urinorm from openid import yadis from openid.yadis.etxrd import nsTag, XRDSError, XRD_NS_2_0 from openid.yadis.services import applyFilter as extractServices from openid.yadis.discover import discover as yadisDiscover from openid.yadis.discover import DiscoveryFailure from openid.yadis import xrires, filters from openid.yadis import xri from openid.consumer import html_parse OPENID_1_0_NS = 'http://openid.net/xmlns/1.0' OPENID_IDP_2_0_TYPE = 'http://specs.openid.net/auth/2.0/server' OPENID_2_0_TYPE = 'http://specs.openid.net/auth/2.0/signon' OPENID_1_1_TYPE = 'http://openid.net/signon/1.1' OPENID_1_0_TYPE = 'http://openid.net/signon/1.0' from openid.message import OPENID1_NS as OPENID_1_0_MESSAGE_NS from openid.message import OPENID2_NS as OPENID_2_0_MESSAGE_NS class OpenIDServiceEndpoint(object): """Object representing an OpenID service endpoint. @ivar identity_url: the verified identifier. @ivar canonicalID: For XRI, the persistent identifier. """ # OpenID service type URIs, listed in order of preference. The # ordering of this list affects yadis and XRI service discovery. openid_type_uris = [ OPENID_IDP_2_0_TYPE, OPENID_2_0_TYPE, OPENID_1_1_TYPE, OPENID_1_0_TYPE, ] def __init__(self): self.claimed_id = None self.server_url = None self.type_uris = [] self.local_id = None self.canonicalID = None self.used_yadis = False # whether this came from an XRDS self.display_identifier = None def usesExtension(self, extension_uri): return extension_uri in self.type_uris def preferredNamespace(self): if (OPENID_IDP_2_0_TYPE in self.type_uris or OPENID_2_0_TYPE in self.type_uris): return OPENID_2_0_MESSAGE_NS else: return OPENID_1_0_MESSAGE_NS def supportsType(self, type_uri): """Does this endpoint support this type? I consider C{/server} endpoints to implicitly support C{/signon}. """ return ( (type_uri in self.type_uris) or (type_uri == OPENID_2_0_TYPE and self.isOPIdentifier()) ) def getDisplayIdentifier(self): """Return the display_identifier if set, else return the claimed_id. """ if self.display_identifier is not None: return self.display_identifier if self.claimed_id is None: return None else: return urlparse.urldefrag(self.claimed_id)[0] def compatibilityMode(self): return self.preferredNamespace() != OPENID_2_0_MESSAGE_NS def isOPIdentifier(self): return OPENID_IDP_2_0_TYPE in self.type_uris def parseService(self, yadis_url, uri, type_uris, service_element): """Set the state of this object based on the contents of the service element.""" self.type_uris = type_uris self.server_url = uri self.used_yadis = True if not self.isOPIdentifier(): # XXX: This has crappy implications for Service elements # that contain both 'server' and 'signon' Types. But # that's a pathological configuration anyway, so I don't # think I care. self.local_id = findOPLocalIdentifier(service_element, self.type_uris) self.claimed_id = yadis_url def getLocalID(self): """Return the identifier that should be sent as the openid.identity parameter to the server.""" # I looked at this conditional and thought "ah-hah! there's the bug!" # but Python actually makes that one big expression somehow, i.e. # "x is x is x" is not the same thing as "(x is x) is x". # That's pretty weird, dude. -- kmt, 1/07 if (self.local_id is self.canonicalID is None): return self.claimed_id else: return self.local_id or self.canonicalID def fromBasicServiceEndpoint(cls, endpoint): """Create a new instance of this class from the endpoint object passed in. @return: None or OpenIDServiceEndpoint for this endpoint object""" type_uris = endpoint.matchTypes(cls.openid_type_uris) # If any Type URIs match and there is an endpoint URI # specified, then this is an OpenID endpoint if type_uris and endpoint.uri is not None: openid_endpoint = cls() openid_endpoint.parseService( endpoint.yadis_url, endpoint.uri, endpoint.type_uris, endpoint.service_element) else: openid_endpoint = None return openid_endpoint fromBasicServiceEndpoint = classmethod(fromBasicServiceEndpoint) def fromHTML(cls, uri, html): """Parse the given document as HTML looking for an OpenID @rtype: [OpenIDServiceEndpoint] """ discovery_types = [ (OPENID_2_0_TYPE, 'openid2.provider', 'openid2.local_id'), (OPENID_1_1_TYPE, 'openid.server', 'openid.delegate'), ] link_attrs = html_parse.parseLinkAttrs(html) services = [] for type_uri, op_endpoint_rel, local_id_rel in discovery_types: op_endpoint_url = html_parse.findFirstHref( link_attrs, op_endpoint_rel) if op_endpoint_url is None: continue service = cls() service.claimed_id = uri service.local_id = html_parse.findFirstHref( link_attrs, local_id_rel) service.server_url = op_endpoint_url service.type_uris = [type_uri] services.append(service) return services fromHTML = classmethod(fromHTML) def fromXRDS(cls, uri, xrds): """Parse the given document as XRDS looking for OpenID services. @rtype: [OpenIDServiceEndpoint] @raises XRDSError: When the XRDS does not parse. @since: 2.1.0 """ return extractServices(uri, xrds, cls) fromXRDS = classmethod(fromXRDS) def fromDiscoveryResult(cls, discoveryResult): """Create endpoints from a DiscoveryResult. @type discoveryResult: L{DiscoveryResult} @rtype: list of L{OpenIDServiceEndpoint} @raises XRDSError: When the XRDS does not parse. @since: 2.1.0 """ if discoveryResult.isXRDS(): method = cls.fromXRDS else: method = cls.fromHTML return method(discoveryResult.normalized_uri, discoveryResult.response_text) fromDiscoveryResult = classmethod(fromDiscoveryResult) def fromOPEndpointURL(cls, op_endpoint_url): """Construct an OP-Identifier OpenIDServiceEndpoint object for a given OP Endpoint URL @param op_endpoint_url: The URL of the endpoint @rtype: OpenIDServiceEndpoint """ service = cls() service.server_url = op_endpoint_url service.type_uris = [OPENID_IDP_2_0_TYPE] return service fromOPEndpointURL = classmethod(fromOPEndpointURL) def __str__(self): return ("<%s.%s " "server_url=%r " "claimed_id=%r " "local_id=%r " "canonicalID=%r " "used_yadis=%s " ">" % (self.__class__.__module__, self.__class__.__name__, self.server_url, self.claimed_id, self.local_id, self.canonicalID, self.used_yadis)) def findOPLocalIdentifier(service_element, type_uris): """Find the OP-Local Identifier for this xrd:Service element. This considers openid:Delegate to be a synonym for xrd:LocalID if both OpenID 1.X and OpenID 2.0 types are present. If only OpenID 1.X is present, it returns the value of openid:Delegate. If only OpenID 2.0 is present, it returns the value of xrd:LocalID. If there is more than one LocalID tag and the values are different, it raises a DiscoveryFailure. This is also triggered when the xrd:LocalID and openid:Delegate tags are different. @param service_element: The xrd:Service element @type service_element: ElementTree.Node @param type_uris: The xrd:Type values present in this service element. This function could extract them, but higher level code needs to do that anyway. @type type_uris: [str] @raises DiscoveryFailure: when discovery fails. @returns: The OP-Local Identifier for this service element, if one is present, or None otherwise. @rtype: str or unicode or NoneType """ # XXX: Test this function on its own! # Build the list of tags that could contain the OP-Local Identifier local_id_tags = [] if (OPENID_1_1_TYPE in type_uris or OPENID_1_0_TYPE in type_uris): local_id_tags.append(nsTag(OPENID_1_0_NS, 'Delegate')) if OPENID_2_0_TYPE in type_uris: local_id_tags.append(nsTag(XRD_NS_2_0, 'LocalID')) # Walk through all the matching tags and make sure that they all # have the same value local_id = None for local_id_tag in local_id_tags: for local_id_element in service_element.findall(local_id_tag): if local_id is None: local_id = local_id_element.text elif local_id != local_id_element.text: format = 'More than one %r tag found in one service element' message = format % (local_id_tag,) raise DiscoveryFailure(message, None) return local_id def normalizeURL(url): """Normalize a URL, converting normalization failures to DiscoveryFailure""" try: normalized = urinorm.urinorm(url) except ValueError, why: raise DiscoveryFailure('Normalizing identifier: %s' % (why[0],), None) else: return urlparse.urldefrag(normalized)[0] def normalizeXRI(xri): """Normalize an XRI, stripping its scheme if present""" if xri.startswith("xri://"): xri = xri[6:] return xri def arrangeByType(service_list, preferred_types): """Rearrange service_list in a new list so services are ordered by types listed in preferred_types. Return the new list.""" def enumerate(elts): """Return an iterable that pairs the index of an element with that element. For Python 2.2 compatibility""" return zip(range(len(elts)), elts) def bestMatchingService(service): """Return the index of the first matching type, or something higher if no type matches. This provides an ordering in which service elements that contain a type that comes earlier in the preferred types list come before service elements that come later. If a service element has more than one type, the most preferred one wins. """ for i, t in enumerate(preferred_types): if preferred_types[i] in service.type_uris: return i return len(preferred_types) # Build a list with the service elements in tuples whose # comparison will prefer the one with the best matching service prio_services = [(bestMatchingService(s), orig_index, s) for (orig_index, s) in enumerate(service_list)] prio_services.sort() # Now that the services are sorted by priority, remove the sort # keys from the list. for i in range(len(prio_services)): prio_services[i] = prio_services[i][2] return prio_services def getOPOrUserServices(openid_services): """Extract OP Identifier services. If none found, return the rest, sorted with most preferred first according to OpenIDServiceEndpoint.openid_type_uris. openid_services is a list of OpenIDServiceEndpoint objects. Returns a list of OpenIDServiceEndpoint objects.""" op_services = arrangeByType(openid_services, [OPENID_IDP_2_0_TYPE]) openid_services = arrangeByType(openid_services, OpenIDServiceEndpoint.openid_type_uris) return op_services or openid_services def discoverYadis(uri): """Discover OpenID services for a URI. Tries Yadis and falls back on old-style discovery if Yadis fails. @param uri: normalized identity URL @type uri: str @return: (claimed_id, services) @rtype: (str, list(OpenIDServiceEndpoint)) @raises DiscoveryFailure: when discovery fails. """ # Might raise a yadis.discover.DiscoveryFailure if no document # came back for that URI at all. I don't think falling back # to OpenID 1.0 discovery on the same URL will help, so don't # bother to catch it. response = yadisDiscover(uri) yadis_url = response.normalized_uri body = response.response_text try: openid_services = OpenIDServiceEndpoint.fromXRDS(yadis_url, body) except XRDSError: # Does not parse as a Yadis XRDS file openid_services = [] if not openid_services: # Either not an XRDS or there are no OpenID services. if response.isXRDS(): # if we got the Yadis content-type or followed the Yadis # header, re-fetch the document without following the Yadis # header, with no Accept header. return discoverNoYadis(uri) # Try to parse the response as HTML. # openid_services = OpenIDServiceEndpoint.fromHTML(yadis_url, body) return (yadis_url, getOPOrUserServices(openid_services)) def discoverXRI(iname): endpoints = [] iname = normalizeXRI(iname) try: canonicalID, services = xrires.ProxyResolver().query( iname, OpenIDServiceEndpoint.openid_type_uris) if canonicalID is None: raise XRDSError('No CanonicalID found for XRI %r' % (iname,)) flt = filters.mkFilter(OpenIDServiceEndpoint) for service_element in services: endpoints.extend(flt.getServiceEndpoints(iname, service_element)) except XRDSError: oidutil.log('xrds error on ' + iname) for endpoint in endpoints: # Is there a way to pass this through the filter to the endpoint # constructor instead of tacking it on after? endpoint.canonicalID = canonicalID endpoint.claimed_id = canonicalID endpoint.display_identifier = iname # FIXME: returned xri should probably be in some normal form return iname, getOPOrUserServices(endpoints) def discoverNoYadis(uri): http_resp = fetchers.fetch(uri) if http_resp.status not in (200, 206): raise DiscoveryFailure( 'HTTP Response status from identity URL host is not 200. ' 'Got status %r' % (http_resp.status,), http_resp) claimed_id = http_resp.final_url openid_services = OpenIDServiceEndpoint.fromHTML( claimed_id, http_resp.body) return claimed_id, openid_services def discoverURI(uri): parsed = urlparse.urlparse(uri) if parsed[0] and parsed[1]: if parsed[0] not in ['http', 'https']: raise DiscoveryFailure('URI scheme is not HTTP or HTTPS', None) else: uri = 'http://' + uri uri = normalizeURL(uri) claimed_id, openid_services = discoverYadis(uri) claimed_id = normalizeURL(claimed_id) return claimed_id, openid_services def discover(identifier): if xri.identifierScheme(identifier) == "XRI": return discoverXRI(identifier) else: return discoverURI(identifier)