"""An implementation of the OpenID Provider Authentication Policy Extension 1.0, Draft 5 @see: http://openid.net/developers/specs/ @since: 2.1.0 """ __all__ = [ 'Request', 'Response', 'ns_uri', 'AUTH_PHISHING_RESISTANT', 'AUTH_MULTI_FACTOR', 'AUTH_MULTI_FACTOR_PHYSICAL', 'LEVELS_NIST', 'LEVELS_JISA', ] from openid.extension import Extension import warnings import re ns_uri = "http://specs.openid.net/extensions/pape/1.0" AUTH_MULTI_FACTOR_PHYSICAL = \ 'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical' AUTH_MULTI_FACTOR = \ 'http://schemas.openid.net/pape/policies/2007/06/multi-factor' AUTH_PHISHING_RESISTANT = \ 'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant' AUTH_NONE = \ 'http://schemas.openid.net/pape/policies/2007/06/none' TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$') LEVELS_NIST = 'http://csrc.nist.gov/publications/nistpubs/800-63/SP800-63V1_0_2.pdf' LEVELS_JISA = 'http://www.jisa.or.jp/spec/auth_level.html' class PAPEExtension(Extension): _default_auth_level_aliases = { 'nist': LEVELS_NIST, 'jisa': LEVELS_JISA, } def __init__(self): self.auth_level_aliases = self._default_auth_level_aliases.copy() def _addAuthLevelAlias(self, auth_level_uri, alias=None): """Add an auth level URI alias to this request. @param auth_level_uri: The auth level URI to send in the request. @param alias: The namespace alias to use for this auth level in this message. May be None if the alias is not important. """ if alias is None: try: alias = self._getAlias(auth_level_uri) except KeyError: alias = self._generateAlias() else: existing_uri = self.auth_level_aliases.get(alias) if existing_uri is not None and existing_uri != auth_level_uri: raise KeyError('Attempting to redefine alias %r from %r to %r', alias, existing_uri, auth_level_uri) self.auth_level_aliases[alias] = auth_level_uri def _generateAlias(self): """Return an unused auth level alias""" for i in xrange(1000): alias = 'cust%d' % (i,) if alias not in self.auth_level_aliases: return alias raise RuntimeError('Could not find an unused alias (tried 1000!)') def _getAlias(self, auth_level_uri): """Return the alias for the specified auth level URI. @raises KeyError: if no alias is defined """ for (alias, existing_uri) in self.auth_level_aliases.iteritems(): if auth_level_uri == existing_uri: return alias raise KeyError(auth_level_uri) class Request(PAPEExtension): """A Provider Authentication Policy request, sent from a relying party to a provider @ivar preferred_auth_policies: The authentication policies that the relying party prefers @type preferred_auth_policies: [str] @ivar max_auth_age: The maximum time, in seconds, that the relying party wants to allow to have elapsed before the user must re-authenticate @type max_auth_age: int or NoneType @ivar preferred_auth_level_types: Ordered list of authentication level namespace URIs @type preferred_auth_level_types: [str] """ ns_alias = 'pape' def __init__(self, preferred_auth_policies=None, max_auth_age=None, preferred_auth_level_types=None): super(Request, self).__init__() if preferred_auth_policies is None: preferred_auth_policies = [] self.preferred_auth_policies = preferred_auth_policies self.max_auth_age = max_auth_age self.preferred_auth_level_types = [] if preferred_auth_level_types is not None: for auth_level in preferred_auth_level_types: self.addAuthLevel(auth_level) def __nonzero__(self): return bool(self.preferred_auth_policies or self.max_auth_age is not None or self.preferred_auth_level_types) def addPolicyURI(self, policy_uri): """Add an acceptable authentication policy URI to this request This method is intended to be used by the relying party to add acceptable authentication types to the request. @param policy_uri: The identifier for the preferred type of authentication. @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-05.html#auth_policies """ if policy_uri not in self.preferred_auth_policies: self.preferred_auth_policies.append(policy_uri) def addAuthLevel(self, auth_level_uri, alias=None): self._addAuthLevelAlias(auth_level_uri, alias) if auth_level_uri not in self.preferred_auth_level_types: self.preferred_auth_level_types.append(auth_level_uri) def getExtensionArgs(self): """@see: C{L{Extension.getExtensionArgs}} """ ns_args = { 'preferred_auth_policies':' '.join(self.preferred_auth_policies), } if self.max_auth_age is not None: ns_args['max_auth_age'] = str(self.max_auth_age) if self.preferred_auth_level_types: preferred_types = [] for auth_level_uri in self.preferred_auth_level_types: alias = self._getAlias(auth_level_uri) ns_args['auth_level.ns.%s' % (alias,)] = auth_level_uri preferred_types.append(alias) ns_args['preferred_auth_level_types'] = ' '.join(preferred_types) return ns_args def fromOpenIDRequest(cls, request): """Instantiate a Request object from the arguments in a C{checkid_*} OpenID message """ self = cls() args = request.message.getArgs(self.ns_uri) is_openid1 = request.message.isOpenID1() if args == {}: return None self.parseExtensionArgs(args, is_openid1) return self fromOpenIDRequest = classmethod(fromOpenIDRequest) def parseExtensionArgs(self, args, is_openid1, strict=False): """Set the state of this request to be that expressed in these PAPE arguments @param args: The PAPE arguments without a namespace @param strict: Whether to raise an exception if the input is out of spec or otherwise malformed. If strict is false, malformed input will be ignored. @param is_openid1: Whether the input should be treated as part of an OpenID1 request @rtype: None @raises ValueError: When the max_auth_age is not parseable as an integer """ # preferred_auth_policies is a space-separated list of policy URIs self.preferred_auth_policies = [] policies_str = args.get('preferred_auth_policies') if policies_str: for uri in policies_str.split(' '): if uri not in self.preferred_auth_policies: self.preferred_auth_policies.append(uri) # max_auth_age is base-10 integer number of seconds max_auth_age_str = args.get('max_auth_age') self.max_auth_age = None if max_auth_age_str: try: self.max_auth_age = int(max_auth_age_str) except ValueError: if strict: raise # Parse auth level information preferred_auth_level_types = args.get('preferred_auth_level_types') if preferred_auth_level_types: aliases = preferred_auth_level_types.strip().split() for alias in aliases: key = 'auth_level.ns.%s' % (alias,) try: uri = args[key] except KeyError: if is_openid1: uri = self._default_auth_level_aliases.get(alias) else: uri = None if uri is None: if strict: raise ValueError('preferred auth level %r is not ' 'defined in this message' % (alias,)) else: self.addAuthLevel(uri, alias) def preferredTypes(self, supported_types): """Given a list of authentication policy URIs that a provider supports, this method returns the subsequence of those types that are preferred by the relying party. @param supported_types: A sequence of authentication policy type URIs that are supported by a provider @returns: The sub-sequence of the supported types that are preferred by the relying party. This list will be ordered in the order that the types appear in the supported_types sequence, and may be empty if the provider does not prefer any of the supported authentication types. @returntype: [str] """ return filter(self.preferred_auth_policies.__contains__, supported_types) Request.ns_uri = ns_uri class Response(PAPEExtension): """A Provider Authentication Policy response, sent from a provider to a relying party @ivar auth_policies: List of authentication policies conformed to by this OpenID assertion, represented as policy URIs """ ns_alias = 'pape' def __init__(self, auth_policies=None, auth_time=None, auth_levels=None): super(Response, self).__init__() if auth_policies: self.auth_policies = auth_policies else: self.auth_policies = [] self.auth_time = auth_time self.auth_levels = {} if auth_levels is None: auth_levels = {} for uri, level in auth_levels.iteritems(): self.setAuthLevel(uri, level) def setAuthLevel(self, level_uri, level, alias=None): """Set the value for the given auth level type. @param level: string representation of an authentication level valid for level_uri @param alias: An optional namespace alias for the given auth level URI. May be omitted if the alias is not significant. The library will use a reasonable default for widely-used auth level types. """ self._addAuthLevelAlias(level_uri, alias) self.auth_levels[level_uri] = level def getAuthLevel(self, level_uri): """Return the auth level for the specified auth level identifier @returns: A string that should map to the auth levels defined for the auth level type @raises KeyError: If the auth level type is not present in this message """ return self.auth_levels[level_uri] def _getNISTAuthLevel(self): try: return int(self.getAuthLevel(LEVELS_NIST)) except KeyError: return None nist_auth_level = property( _getNISTAuthLevel, doc="Backward-compatibility accessor for the NIST auth level") def addPolicyURI(self, policy_uri): """Add a authentication policy to this response This method is intended to be used by the provider to add a policy that the provider conformed to when authenticating the user. @param policy_uri: The identifier for the preferred type of authentication. @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies """ if policy_uri == AUTH_NONE: raise RuntimeError( 'To send no policies, do not set any on the response.') if policy_uri not in self.auth_policies: self.auth_policies.append(policy_uri) def fromSuccessResponse(cls, success_response): """Create a C{L{Response}} object from a successful OpenID library response (C{L{openid.consumer.consumer.SuccessResponse}}) response message @param success_response: A SuccessResponse from consumer.complete() @type success_response: C{L{openid.consumer.consumer.SuccessResponse}} @rtype: Response or None @returns: A provider authentication policy response from the data that was supplied with the C{id_res} response or None if the provider sent no signed PAPE response arguments. """ self = cls() # PAPE requires that the args be signed. args = success_response.getSignedNS(self.ns_uri) is_openid1 = success_response.isOpenID1() # Only try to construct a PAPE response if the arguments were # signed in the OpenID response. If not, return None. if args is not None: self.parseExtensionArgs(args, is_openid1) return self else: return None def parseExtensionArgs(self, args, is_openid1, strict=False): """Parse the provider authentication policy arguments into the internal state of this object @param args: unqualified provider authentication policy arguments @param strict: Whether to raise an exception when bad data is encountered @returns: None. The data is parsed into the internal fields of this object. """ policies_str = args.get('auth_policies') if policies_str: auth_policies = policies_str.split(' ') elif strict: raise ValueError('Missing auth_policies') else: auth_policies = [] if (len(auth_policies) > 1 and strict and AUTH_NONE in auth_policies): raise ValueError('Got some auth policies, as well as the special ' '"none" URI: %r' % (auth_policies,)) if 'none' in auth_policies: msg = '"none" used as a policy URI (see PAPE draft < 5)' if strict: raise ValueError(msg) else: warnings.warn(msg, stacklevel=2) auth_policies = [u for u in auth_policies if u not in ['none', AUTH_NONE]] self.auth_policies = auth_policies for (key, val) in args.iteritems(): if key.startswith('auth_level.'): alias = key[11:] # skip the already-processed namespace declarations if alias.startswith('ns.'): continue try: uri = args['auth_level.ns.%s' % (alias,)] except KeyError: if is_openid1: uri = self._default_auth_level_aliases.get(alias) else: uri = None if uri is None: if strict: raise ValueError( 'Undefined auth level alias: %r' % (alias,)) else: self.setAuthLevel(uri, val, alias) auth_time = args.get('auth_time') if auth_time: if TIME_VALIDATOR.match(auth_time): self.auth_time = auth_time elif strict: raise ValueError("auth_time must be in RFC3339 format") fromSuccessResponse = classmethod(fromSuccessResponse) def getExtensionArgs(self): """@see: C{L{Extension.getExtensionArgs}} """ if len(self.auth_policies) == 0: ns_args = { 'auth_policies': AUTH_NONE, } else: ns_args = { 'auth_policies':' '.join(self.auth_policies), } for level_type, level in self.auth_levels.iteritems(): alias = self._getAlias(level_type) ns_args['auth_level.ns.%s' % (alias,)] = level_type ns_args['auth_level.%s' % (alias,)] = str(level) if self.auth_time is not None: if not TIME_VALIDATOR.match(self.auth_time): raise ValueError('auth_time must be in RFC3339 format') ns_args['auth_time'] = self.auth_time return ns_args Response.ns_uri = ns_uri