# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import datetime
import struct
import uuid
from cryptography import fernet
import msgpack
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import six
from six.moves import map
from six.moves import urllib
from keystone.auth import plugins as auth_plugins
from keystone.common import utils as ks_utils
from keystone import exception
from keystone.i18n import _, _LI
from keystone.token import provider
from keystone.token.providers.fernet import utils
CONF = cfg.CONF
LOG = log.getLogger(__name__)
# Fernet byte indexes as as computed by pypi/keyless_fernet and defined in
# https://github.com/fernet/spec
TIMESTAMP_START = 1
TIMESTAMP_END = 9
[docs]class BasePayload(object):
# each payload variant should have a unique version
version = None
@classmethod
[docs] def assemble(cls, *args):
"""Assemble the payload of a token.
:param args: whatever data should go into the payload
:returns: the payload of a token
"""
raise NotImplementedError()
@classmethod
[docs] def disassemble(cls, payload):
"""Disassemble an unscoped payload into the component data.
:param payload: this variant of payload
:returns: a tuple of the payloads component data
"""
raise NotImplementedError()
@classmethod
[docs] def convert_uuid_hex_to_bytes(cls, uuid_string):
"""Compress UUID formatted strings to bytes.
:param uuid_string: uuid string to compress to bytes
:returns: a byte representation of the uuid
"""
# TODO(lbragstad): Wrap this in an exception. Not sure what the case
# would be where we couldn't handle what we've been given but incase
# the integrity of the token has been compromised.
uuid_obj = uuid.UUID(uuid_string)
return uuid_obj.bytes
@classmethod
[docs] def convert_uuid_bytes_to_hex(cls, uuid_byte_string):
"""Generate uuid.hex format based on byte string.
:param uuid_byte_string: uuid string to generate from
:returns: uuid hex formatted string
"""
# TODO(lbragstad): Wrap this in an exception. Not sure what the case
# would be where we couldn't handle what we've been given but incase
# the integrity of the token has been compromised.
uuid_obj = uuid.UUID(bytes=uuid_byte_string)
return uuid_obj.hex
@classmethod
def _convert_time_string_to_int(cls, time_string):
"""Convert a time formatted string to a timestamp integer.
:param time_string: time formatted string
:returns: an integer timestamp
"""
time_object = timeutils.parse_isotime(time_string)
return (timeutils.normalize_time(time_object) -
datetime.datetime.utcfromtimestamp(0)).total_seconds()
@classmethod
def _convert_int_to_time_string(cls, time_int):
"""Convert a timestamp integer to a string.
:param time_int: integer representing timestamp
:returns: a time formatted strings
"""
time_object = datetime.datetime.utcfromtimestamp(time_int)
return ks_utils.isotime(time_object, subsecond=True)
@classmethod
[docs] def attempt_convert_uuid_hex_to_bytes(cls, value):
"""Attempt to convert value to bytes or return value.
:param value: value to attempt to convert to bytes
:returns: tuple containing boolean indicating whether user_id was
stored as bytes and uuid value as bytes or the original value
"""
try:
return (True, cls.convert_uuid_hex_to_bytes(value))
except ValueError:
# this might not be a UUID, depending on the situation (i.e.
# federation)
return (False, value)
@classmethod
[docs] def attempt_convert_uuid_bytes_to_hex(cls, value):
"""Attempt to convert value to hex or return value.
:param value: value to attempt to convert to hex
:returns: uuid value in hex or value
"""
try:
return cls.convert_uuid_bytes_to_hex(value)
except ValueError:
return value
[docs]class UnscopedPayload(BasePayload):
version = 0
@classmethod
[docs] def assemble(cls, user_id, methods, expires_at, audit_ids):
"""Assemble the payload of an unscoped token.
:param user_id: identifier of the user in the token request
:param methods: list of authentication methods used
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:returns: the payload of an unscoped token
"""
b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id)
methods = auth_plugins.convert_method_list_to_integer(methods)
expires_at_int = cls._convert_time_string_to_int(expires_at)
b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes,
audit_ids))
return (b_user_id, methods, expires_at_int, b_audit_ids)
@classmethod
[docs] def disassemble(cls, payload):
"""Disassemble an unscoped payload into the component data.
:param payload: the payload of an unscoped token
:return: a tuple containing the user_id, auth methods, expires_at, and
audit_ids
"""
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
expires_at_str = cls._convert_int_to_time_string(payload[2])
audit_ids = list(map(provider.base64_encode, payload[3]))
return (user_id, methods, expires_at_str, audit_ids)
[docs]class DomainScopedPayload(BasePayload):
version = 1
@classmethod
[docs] def assemble(cls, user_id, methods, domain_id, expires_at, audit_ids):
"""Assemble the payload of a domain-scoped token.
:param user_id: ID of the user in the token request
:param methods: list of authentication methods used
:param domain_id: ID of the domain to scope to
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:returns: the payload of a domain-scoped token
"""
b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id)
methods = auth_plugins.convert_method_list_to_integer(methods)
try:
b_domain_id = cls.convert_uuid_hex_to_bytes(domain_id)
except ValueError:
# the default domain ID is configurable, and probably isn't a UUID
if domain_id == CONF.identity.default_domain_id:
b_domain_id = domain_id
else:
raise
expires_at_int = cls._convert_time_string_to_int(expires_at)
b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes,
audit_ids))
return (b_user_id, methods, b_domain_id, expires_at_int, b_audit_ids)
@classmethod
[docs] def disassemble(cls, payload):
"""Disassemble a payload into the component data.
:param payload: the payload of a token
:return: a tuple containing the user_id, auth methods, domain_id,
expires_at_str, and audit_ids
"""
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
try:
domain_id = cls.convert_uuid_bytes_to_hex(payload[2])
except ValueError:
# the default domain ID is configurable, and probably isn't a UUID
if payload[2] == CONF.identity.default_domain_id:
domain_id = payload[2]
else:
raise
expires_at_str = cls._convert_int_to_time_string(payload[3])
audit_ids = list(map(provider.base64_encode, payload[4]))
return (user_id, methods, domain_id, expires_at_str, audit_ids)
[docs]class ProjectScopedPayload(BasePayload):
version = 2
@classmethod
[docs] def assemble(cls, user_id, methods, project_id, expires_at, audit_ids):
"""Assemble the payload of a project-scoped token.
:param user_id: ID of the user in the token request
:param methods: list of authentication methods used
:param project_id: ID of the project to scope to
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:returns: the payload of a project-scoped token
"""
b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id)
methods = auth_plugins.convert_method_list_to_integer(methods)
b_project_id = cls.attempt_convert_uuid_hex_to_bytes(project_id)
expires_at_int = cls._convert_time_string_to_int(expires_at)
b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes,
audit_ids))
return (b_user_id, methods, b_project_id, expires_at_int, b_audit_ids)
@classmethod
[docs] def disassemble(cls, payload):
"""Disassemble a payload into the component data.
:param payload: the payload of a token
:return: a tuple containing the user_id, auth methods, project_id,
expires_at_str, and audit_ids
"""
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
(is_stored_as_bytes, project_id) = payload[2]
if is_stored_as_bytes:
project_id = cls.attempt_convert_uuid_bytes_to_hex(project_id)
expires_at_str = cls._convert_int_to_time_string(payload[3])
audit_ids = list(map(provider.base64_encode, payload[4]))
return (user_id, methods, project_id, expires_at_str, audit_ids)
[docs]class TrustScopedPayload(BasePayload):
version = 3
@classmethod
[docs] def assemble(cls, user_id, methods, project_id, expires_at, audit_ids,
trust_id):
"""Assemble the payload of a trust-scoped token.
:param user_id: ID of the user in the token request
:param methods: list of authentication methods used
:param project_id: ID of the project to scope to
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:param trust_id: ID of the trust in effect
:returns: the payload of a trust-scoped token
"""
b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id)
methods = auth_plugins.convert_method_list_to_integer(methods)
b_project_id = cls.attempt_convert_uuid_hex_to_bytes(project_id)
b_trust_id = cls.convert_uuid_hex_to_bytes(trust_id)
expires_at_int = cls._convert_time_string_to_int(expires_at)
b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes,
audit_ids))
return (b_user_id, methods, b_project_id, expires_at_int, b_audit_ids,
b_trust_id)
@classmethod
[docs] def disassemble(cls, payload):
"""Validate a trust-based payload.
:param token_string: a string representing the token
:returns: a tuple containing the user_id, auth methods, project_id,
expires_at_str, audit_ids, and trust_id
"""
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
(is_stored_as_bytes, project_id) = payload[2]
if is_stored_as_bytes:
project_id = cls.attempt_convert_uuid_bytes_to_hex(project_id)
expires_at_str = cls._convert_int_to_time_string(payload[3])
audit_ids = list(map(provider.base64_encode, payload[4]))
trust_id = cls.convert_uuid_bytes_to_hex(payload[5])
return (user_id, methods, project_id, expires_at_str, audit_ids,
trust_id)
[docs]class FederatedUnscopedPayload(BasePayload):
version = 4
@classmethod
[docs] def pack_group_id(cls, group_dict):
return cls.attempt_convert_uuid_hex_to_bytes(group_dict['id'])
@classmethod
[docs] def unpack_group_id(cls, group_id_in_bytes):
(is_stored_as_bytes, group_id) = group_id_in_bytes
if is_stored_as_bytes:
group_id = cls.attempt_convert_uuid_bytes_to_hex(group_id)
return {'id': group_id}
@classmethod
[docs] def assemble(cls, user_id, methods, expires_at, audit_ids, federated_info):
"""Assemble the payload of a federated token.
:param user_id: ID of the user in the token request
:param methods: list of authentication methods used
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:param federated_info: dictionary containing group IDs, the identity
provider ID, protocol ID, and federated domain
ID
:returns: the payload of a federated token
"""
b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id)
methods = auth_plugins.convert_method_list_to_integer(methods)
b_group_ids = list(map(cls.pack_group_id,
federated_info['group_ids']))
b_idp_id = cls.attempt_convert_uuid_hex_to_bytes(
federated_info['idp_id'])
protocol_id = federated_info['protocol_id']
expires_at_int = cls._convert_time_string_to_int(expires_at)
b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes,
audit_ids))
return (b_user_id, methods, b_group_ids, b_idp_id, protocol_id,
expires_at_int, b_audit_ids)
@classmethod
[docs] def disassemble(cls, payload):
"""Validate a federated payload.
:param token_string: a string representing the token
:return: a tuple containing the user_id, auth methods, audit_ids, and a
dictionary containing federated information such as the group
IDs, the identity provider ID, the protocol ID, and the
federated domain ID
"""
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
group_ids = list(map(cls.unpack_group_id, payload[2]))
(is_stored_as_bytes, idp_id) = payload[3]
if is_stored_as_bytes:
idp_id = cls.attempt_convert_uuid_bytes_to_hex(idp_id)
protocol_id = payload[4]
expires_at_str = cls._convert_int_to_time_string(payload[5])
audit_ids = list(map(provider.base64_encode, payload[6]))
federated_info = dict(group_ids=group_ids, idp_id=idp_id,
protocol_id=protocol_id)
return (user_id, methods, expires_at_str, audit_ids, federated_info)
[docs]class FederatedScopedPayload(FederatedUnscopedPayload):
version = None
@classmethod
[docs] def assemble(cls, user_id, methods, scope_id, expires_at, audit_ids,
federated_info):
"""Assemble the project-scoped payload of a federated token.
:param user_id: ID of the user in the token request
:param methods: list of authentication methods used
:param scope_id: ID of the project or domain ID to scope to
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:param federated_info: dictionary containing the identity provider ID,
protocol ID, federated domain ID and group IDs
:returns: the payload of a federated token
"""
b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id)
methods = auth_plugins.convert_method_list_to_integer(methods)
b_scope_id = cls.attempt_convert_uuid_hex_to_bytes(scope_id)
b_group_ids = list(map(cls.pack_group_id,
federated_info['group_ids']))
b_idp_id = cls.attempt_convert_uuid_hex_to_bytes(
federated_info['idp_id'])
protocol_id = federated_info['protocol_id']
expires_at_int = cls._convert_time_string_to_int(expires_at)
b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes,
audit_ids))
return (b_user_id, methods, b_scope_id, b_group_ids, b_idp_id,
protocol_id, expires_at_int, b_audit_ids)
@classmethod
[docs] def disassemble(cls, payload):
"""Validate a project-scoped federated payload.
:param token_string: a string representing the token
:returns: a tuple containing the user_id, auth methods, scope_id,
expiration time (as str), audit_ids, and a dictionary
containing federated information such as the the identity
provider ID, the protocol ID, the federated domain ID and
group IDs
"""
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
(is_stored_as_bytes, scope_id) = payload[2]
if is_stored_as_bytes:
scope_id = cls.attempt_convert_uuid_bytes_to_hex(scope_id)
group_ids = list(map(cls.unpack_group_id, payload[3]))
(is_stored_as_bytes, idp_id) = payload[4]
if is_stored_as_bytes:
idp_id = cls.attempt_convert_uuid_bytes_to_hex(idp_id)
protocol_id = payload[5]
expires_at_str = cls._convert_int_to_time_string(payload[6])
audit_ids = list(map(provider.base64_encode, payload[7]))
federated_info = dict(idp_id=idp_id, protocol_id=protocol_id,
group_ids=group_ids)
return (user_id, methods, scope_id, expires_at_str, audit_ids,
federated_info)
[docs]class FederatedProjectScopedPayload(FederatedScopedPayload):
version = 5
[docs]class FederatedDomainScopedPayload(FederatedScopedPayload):
version = 6