# This program is free software; you can redistribute it and/or modify
# it under the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright (C) 2014 Red Hat, Inc.
# All rights reserved.
#
# @author: Abhishek Koneru <akoneru@redhat.com>
from __future__ import absolute_import
from __future__ import print_function
import json
import os
from six import iteritems
import pki
import pki.client as client
import pki.account as account
import pki.encoder as encoder
[docs]class ProfileDataInfo(object):
"""Stores information about a profile"""
json_attribute_names = {
'profileId': 'profile_id', 'profileName': 'profile_name',
'profileDescription': 'profile_description', 'profileURL': 'profile_url'
}
def __init__(self):
self.profile_id = None
self.profile_name = None
self.profile_description = None
self.profile_url = None
def __repr__(self):
attributes = {
"ProfileDataInfo": {
'profile_id': self.profile_id,
'name': self.profile_name,
'description': self.profile_description,
'url': self.profile_url
}
}
return str(attributes)
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
profile_data_info = cls()
for k, v in iteritems(attr_list):
if k in ProfileDataInfo.json_attribute_names:
setattr(profile_data_info,
ProfileDataInfo.json_attribute_names[k], v)
else:
setattr(profile_data_info, k, v)
return profile_data_info
[docs]class ProfileDataInfoCollection(object):
"""
Represents a collection of ProfileDataInfo objects.
Also encapsulates the links for the list of the objects stored.
"""
def __init__(self):
self.profile_data_list = []
self.links = []
def __iter__(self):
return iter(self.profile_data_list)
@classmethod
[docs] def from_json(cls, attr_list):
ret = cls()
profile_data_infos = attr_list['entries']
if not isinstance(profile_data_infos, list):
ret.profile_data_list.append(
ProfileDataInfo.from_json(profile_data_infos))
else:
for profile_info in profile_data_infos:
ret.profile_data_list.append(
ProfileDataInfo.from_json(profile_info))
links = attr_list['Link']
if not isinstance(links, list):
ret.links.append(pki.Link.from_json(links))
else:
for link in links:
ret.links.append(pki.Link.from_json(link))
return ret
[docs]class Descriptor(object):
"""
This class represents the description of a ProfileAttribute.
It stores information such as the syntax, constraint and default value of
a profile attribute.
"""
json_attribute_names = {
'Syntax': 'syntax', 'Description': 'description',
'Constraint': 'constraint', 'DefaultValue': 'default_value'
}
def __init__(self, syntax=None, constraint=None, description=None,
default_value=None):
self.syntax = syntax
self.constraint = constraint
self.description = description
self.default_value = default_value
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
descriptor = cls()
for k, v in iteritems(attr_list):
if k in Descriptor.json_attribute_names:
setattr(descriptor,
Descriptor.json_attribute_names[k], v)
else:
setattr(descriptor, k, v)
return descriptor
[docs]class ProfileAttribute(object):
"""
Represents a profile attribute of a ProfileInput.
"""
json_attribute_names = {
'Value': 'value', 'Descriptor': 'descriptor'
}
def __init__(self, name=None, value=None, descriptor=None):
self.name = name
self.value = value
self.descriptor = descriptor
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
attribute = cls()
attribute.name = attr_list['name']
if 'Value' in attr_list:
attribute.value = attr_list['Value']
if 'Descriptor' in attr_list:
attribute.descriptor = Descriptor.from_json(
attr_list['Descriptor'])
return attribute
[docs]class ProfileOutput(object):
"""
This class defines the output of a certificate enrollment request
using a profile.
"""
json_attribute_names = {
'id': 'profile_output_id', 'classId': 'class_id'
}
def __init__(self, profile_output_id=None, name=None, text=None,
class_id=None, attributes=None):
self.profile_output_id = profile_output_id
self.name = name
self.text = text
self.class_id = class_id
if attributes is None:
self.attributes = []
else:
self.attributes = attributes
[docs] def add_attribute(self, profile_attribute):
"""
Add a ProfileAttribute object to the attributes list.
"""
if not isinstance(profile_attribute, ProfileAttribute):
raise ValueError("Object passed is not a ProfileAttribute.")
self.attributes.append(profile_attribute)
[docs] def remove_attribute(self, profile_attribute_name):
"""
Remove a ProfileAttribute object with the given name from the attributes
list.
"""
for attr in self.attributes:
if attr.name == profile_attribute_name:
self.attributes.remove(attr)
break
[docs] def get_attribute(self, profile_attribute_name):
"""
Returns a ProfileAttribute object for the given name.
None, if no match.
"""
for attr in self.attributes:
if attr.name == profile_attribute_name:
return attr
return None
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
profile_output = cls()
for k, v in iteritems(attr_list):
if k not in ['attributes']:
if k in ProfileOutput.json_attribute_names:
setattr(profile_output,
ProfileOutput.json_attribute_names[k], v)
else:
setattr(profile_output, k, v)
attributes = attr_list['attributes']
if not isinstance(attributes, list):
profile_output.attributes.append(
ProfileAttribute.from_json(attributes))
else:
for profile_info in attributes:
profile_output.attributes.append(
ProfileAttribute.from_json(profile_info))
return profile_output
[docs]class ProfileParameter(object):
def __init__(self, name=None, value=None):
self.name = name
self.value = value
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
param = cls()
for attr in attr_list:
setattr(param, attr, attr_list[attr])
return param
[docs]class PolicyDefault(object):
"""
An object of this class contains information of the default usage of a
specific ProfileInput.
"""
json_attribute_names = {
'id': 'name', 'classId': 'class_id',
'policyAttribute': 'policy_attributes', 'params': 'policy_params'
}
def __init__(self, name=None, class_id=None, description=None,
policy_attributes=None, policy_params=None):
self.name = name
self.class_id = class_id
self.description = description
if policy_attributes is None:
self.policy_attributes = []
else:
self.policy_attributes = policy_attributes
if policy_params is None:
self.policy_params = []
else:
self.policy_params = policy_params
[docs] def add_attribute(self, policy_attribute):
"""
Add a policy attribute to the attribute list.
@param policy_attribute - A ProfileAttribute object
"""
if not isinstance(policy_attribute, ProfileAttribute):
raise ValueError("Object passed is not a ProfileAttribute.")
self.policy_attributes.append(policy_attribute)
[docs] def remove_attribute(self, policy_attribute_name):
"""
Remove a policy attribute with the given name from the attributes list.
"""
for attr in self.policy_attributes:
if attr.name == policy_attribute_name:
self.policy_attributes.remove(attr)
break
[docs] def get_attribute(self, policy_attribute_name):
"""
Fetch the policy attribute with the given name from the attributes list.
"""
for attr in self.policy_attributes:
if attr.name == policy_attribute_name:
return attr
return None
[docs] def add_parameter(self, policy_parameter):
"""
Add a profile parameter to the parameters list.
@param policy_parameter - A ProfileParameter object.
"""
if not isinstance(policy_parameter, ProfileParameter):
raise ValueError("Object passed is not a ProfileParameter.")
self.policy_params.append(policy_parameter)
[docs] def remove_parameter(self, profile_parameter_name):
"""
Remove a profile parameter with the given name from the parameters list.
"""
for param in self.policy_params:
if param.name == profile_parameter_name:
self.policy_params.remove(param)
break
[docs] def get_parameter(self, profile_parameter_name):
"""
Fetch a profile parameter with the given name from the parameters list.
"""
for param in self.policy_params:
if param.name == profile_parameter_name:
return param
return None
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
policy_def = cls()
for k, v in iteritems(attr_list):
if k not in ['policyAttribute', 'params']:
if k in PolicyDefault.json_attribute_names:
setattr(policy_def,
PolicyDefault.json_attribute_names[k], v)
else:
setattr(policy_def, k, v)
if 'policyAttribute' in attr_list:
attributes = attr_list['policyAttribute']
if not isinstance(attributes, list):
policy_def.policy_attributes.append(
ProfileAttribute.from_json(attributes))
else:
for attr in attributes:
policy_def.policy_attributes.append(
ProfileAttribute.from_json(attr))
if 'params' in attr_list:
params = attr_list['params']
if not isinstance(params, list):
policy_def.policy_params.append(
ProfileParameter.from_json(params))
else:
for param in params:
policy_def.policy_params.append(
ProfileParameter.from_json(param))
return policy_def
[docs]class PolicyConstraintValue(object):
"""
Represents a PolicyConstraintValue
"""
def __init__(self, name=None, value=None, descriptor=None):
self.name = name
self.value = value
self.descriptor = descriptor
@property
def name(self):
return getattr(self, 'id')
@name.setter
[docs] def name(self, value):
setattr(self, 'id', value)
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
ret = cls()
ret.name = attr_list['id']
ret.value = attr_list['value']
if 'descriptor' in attr_list:
ret.descriptor = Descriptor.from_json(attr_list['descriptor'])
return ret
[docs]class PolicyConstraint(object):
"""
An object of this class contains the policy constraints applied to a
ProfileInput used by a certificate enrollment request.
"""
json_attribute_names = {
'id': 'name', 'classId': 'class_id',
'constraint': 'policy_constraint_values'
}
def __init__(self, name=None, description=None, class_id=None,
policy_constraint_values=None):
self.name = name
self.description = description
self.class_id = class_id
if policy_constraint_values is None:
self.policy_constraint_values = []
else:
self.policy_constraint_values = policy_constraint_values
[docs] def add_constraint_value(self, policy_constraint_value):
"""
Add a PolicyConstraintValue to the policy_constraint_values list.
"""
if not isinstance(policy_constraint_value, PolicyConstraintValue):
raise ValueError("Object passed not of type PolicyConstraintValue")
self.policy_constraint_values.append(policy_constraint_value)
[docs] def remove_constraint_value(self, policy_constraint_value_name):
"""
Removes a PolicyConstraintValue with the given name form the
policy_constraint_values list.
"""
for attr in self.policy_constraint_values:
if attr.name == policy_constraint_value_name:
self.policy_constraint_values.remove(attr)
break
[docs] def get_constraint_value(self, policy_constraint_value_name):
"""
Returns a PolicyConstraintValue object with the given name.
None, if there is no match.
"""
for constraint in self.policy_constraint_values:
if constraint.name == policy_constraint_value_name:
return constraint
return None
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
policy_constraint = cls()
for k, v in iteritems(attr_list):
if k not in ['constraint']:
if k in PolicyConstraint.json_attribute_names:
setattr(policy_constraint,
PolicyConstraint.json_attribute_names[k], v)
else:
setattr(policy_constraint, k, v)
if 'constraint' in attr_list:
constraints = attr_list['constraint']
if not isinstance(constraints, list):
policy_constraint.add_constraint_value(
PolicyConstraintValue.from_json(constraints))
else:
for constraint in constraints:
policy_constraint.add_constraint_value(
PolicyConstraintValue.from_json(constraint))
return policy_constraint
[docs]class ProfilePolicy(object):
"""
This class represents the policy a profile adheres to.
An object of this class stores the default values for profile and the
constraints present on the values of the attributes of the profile submitted
for an enrollment request.
"""
json_attribute_names = {
'id': 'policy_id', 'def': 'policy_default',
'constraint': 'policy_constraint'
}
def __init__(self, policy_id=None, policy_default=None,
policy_constraint=None):
self.policy_id = policy_id
self.policy_default = policy_default
self.policy_constraint = policy_constraint
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
policy = cls()
policy.policy_id = attr_list['id']
if 'def' in attr_list:
policy.policy_default = PolicyDefault.from_json(attr_list['def'])
if 'constraint' in attr_list:
policy.policy_constraint = \
PolicyConstraint.from_json(attr_list['constraint'])
return policy
[docs]class ProfilePolicySet(object):
"""
Stores a list of ProfilePolicy objects.
"""
def __init__(self):
self.policies = []
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
policy_set = cls()
policies = attr_list['policies']
if not isinstance(policies, list):
policy_set.policies.append(ProfilePolicy.from_json(policies))
else:
for policy in policies:
policy_set.policies.append(ProfilePolicy.from_json(policy))
return policy_set
[docs]class PolicySet(object):
"""
An object of this class contains a name value pair of the
policy name and the ProfilePolicy object.
"""
json_attribute_names = {
'id': 'name', 'value': 'policy_list'
}
def __init__(self, name=None, policy_list=None):
self.name = name
if policy_list is None:
self.policy_list = []
else:
self.policy_list = policy_list
[docs] def add_policy(self, profile_policy):
"""
Add a ProfilePolicy object to the policy_list
"""
if not isinstance(profile_policy, ProfilePolicy):
raise ValueError("Object passed is not a ProfilePolicy.")
self.policy_list.append(profile_policy)
[docs] def remove_policy(self, policy_id):
"""
Removes a ProfilePolicy with the given ID from the PolicySet.
"""
for policy in self.policy_list:
if policy.policy_id == policy_id:
self.policy_list.remove(policy)
break
[docs] def get_policy(self, policy_id):
"""
Returns a ProfilePolicy object with the given profile id.
"""
for policy in self.policy_list:
if policy.policy_id == policy_id:
return policy
return None
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
policy_set = cls()
policy_set.name = attr_list['id']
policies = attr_list['value']
if not isinstance(policies, list):
policy_set.policy_list.append(ProfilePolicy.from_json(policies))
else:
for policy in policies:
policy_set.policy_list.append(ProfilePolicy.from_json(policy))
return policy_set
[docs]class PolicySetList(object):
"""
An object of this class stores a list of ProfileSet objects.
"""
def __init__(self, policy_sets=None):
if policy_sets is None:
self.policy_sets = []
else:
self.policy_sets = policy_sets
def __iter__(self):
return iter(self.policy_sets)
@property
def policy_sets(self):
return getattr(self, 'PolicySet')
@policy_sets.setter
[docs] def policy_sets(self, value):
setattr(self, 'PolicySet', value)
[docs] def add_policy_set(self, policy_set):
"""
Add a PolicySet object to the policy_sets list.
"""
if not isinstance(policy_set, PolicySet):
raise ValueError("Object passed is not a PolicySet.")
self.policy_sets.append(policy_set)
[docs] def remove_policy_set(self, policy_set_name):
"""
Remove a PolicySet object with the given name from the policy_sets list.
"""
for policy_set in self.policy_sets:
if policy_set.name == policy_set_name:
self.policy_sets.remove(policy_set)
break
[docs] def get_policy_set(self, policy_set_name):
"""
Fetch the PolicySet object for the given name.
Returns None, if not found.
"""
for policy_set in self.policy_sets:
if policy_set.name == policy_set_name:
return policy_set
return None
@classmethod
[docs] def from_json(cls, attr_list):
if attr_list is None:
return None
policy_set_list = cls()
policy_sets = attr_list['PolicySet']
if not isinstance(policy_sets, list):
policy_set_list.policy_sets.append(
PolicySet.from_json(policy_sets))
else:
for policy_set in policy_sets:
policy_set_list.policy_sets.append(
PolicySet.from_json(policy_set))
return policy_set_list
[docs]class Profile(object):
"""
This class represents an enrollment profile.
"""
json_attribute_names = {
'id': 'profile_id', 'classId': 'class_id', 'enabledBy': 'enabled_by',
'authenticatorId': 'authenticator_id', 'authzAcl': 'authorization_acl',
'xmlOutput': 'xml_output', 'Input': 'inputs', 'Output': 'outputs',
'PolicySets': 'policy_set_list'
}
def __init__(self, profile_id=None, class_id=None, name=None,
description=None, enabled=None, visible=None, enabled_by=None,
authenticator_id=None, authorization_acl=None, renewal=None,
xml_output=None, inputs=None, outputs=None,
policy_set_list=None, link=None):
self.profile_id = profile_id
self.name = name
self.class_id = class_id
self.description = description
self.enabled = enabled
self.visible = visible
self.enabled_by = enabled_by
self.authenticator_id = authenticator_id
self.authorization_acl = authorization_acl
self.renewal = renewal
self.xml_output = xml_output
if inputs is None:
self.inputs = []
else:
self.inputs = inputs
if outputs is None:
self.outputs = []
else:
self.outputs = outputs
if policy_set_list is None:
self.policy_set_list = PolicySetList()
else:
self.policy_set_list = policy_set_list
self.link = link
[docs] def add_output(self, profile_output):
"""
Add a ProfileOutput object to the outputs list of the Profile.
"""
if not isinstance(profile_output, ProfileOutput):
raise ValueError("Object passed is not a PolicyOutput.")
if profile_output is None:
raise ValueError("No ProfileOutput object provided.")
self.outputs.append(profile_output)
[docs] def remove_output(self, profile_output_id):
"""
Remove a ProfileOutput from the outputs list of the Profile.
"""
for profile_output in self.outputs:
if profile_output_id == profile_output.profile_output_id:
self.inputs.remove(profile_output)
[docs] def get_output(self, profile_output_id):
"""
Fetches a ProfileOutput with the given ProfileOutput id.
Returns None, if there is no matching output.
"""
for profile_input in self.inputs:
if profile_output_id == profile_input.profile_input_id:
return profile_input
return None
[docs] def add_policy_set(self, policy_set):
"""
Add a PolicySet object to the policy_sets list of the Profile.
"""
if policy_set is None:
raise ValueError("No PolicySet object provided.")
self.policy_set_list.add_policy_set(policy_set)
[docs] def remove_policy_set(self, policy_set_name):
"""
Remove a PolicySet from the policy_sets list of the Profile.
"""
self.policy_set_list.remove_policy_set(policy_set_name)
[docs] def get_policy_set(self, policy_set_name):
"""
Fetches a ProfileInput with the given ProfileInput id.
Returns None, if there is no matching input.
"""
return self.policy_set_list.get_policy_set(policy_set_name)
@classmethod
[docs] def from_json(cls, attr_list):
profile_data = cls()
for k, v in iteritems(attr_list):
if k not in ['Input', 'Output', 'PolicySets']:
if k in Profile.json_attribute_names:
setattr(profile_data,
Profile.json_attribute_names[k], v)
else:
setattr(profile_data, k, v)
profile_inputs = attr_list['Input']
if not isinstance(profile_inputs, list):
profile_data.inputs.append(ProfileInput.from_json(profile_inputs))
else:
for profile_input in profile_inputs:
profile_data.inputs.append(
ProfileInput.from_json(profile_input))
profile_outputs = attr_list['Output']
if not isinstance(profile_outputs, list):
profile_data.outputs.append(
ProfileOutput.from_json(profile_outputs))
else:
for profile_output in profile_outputs:
profile_data.outputs.append(
ProfileOutput.from_json(profile_output))
profile_data.policy_set_list = \
PolicySetList.from_json(attr_list['PolicySets'])
profile_data.link = pki.Link.from_json(attr_list['link'])
return profile_data
def __repr__(self):
attributes = {
"ProfileData": {
'profile_id': self.profile_id,
'name': self.name,
'description': self.description,
'status': ('enabled' if self.enabled else 'disabled'),
'visible': self.visible
}
}
return str(attributes)
@staticmethod
[docs] def get_profile_data_from_file(path_to_file):
"""
Reads the file for the serialized Profile object.
Currently supports only data format in json.
"""
if path_to_file is None:
raise ValueError("File path must be specified.")
with open(path_to_file) as input_file:
data = input_file.read()
if data is not None:
return Profile.from_json(json.loads(data))
return None
[docs]class ProfileClient(object):
"""
This class consists of methods for accessing the ProfileResource.
"""
def __init__(self, connection):
self.connection = connection
self.headers = {'Content-type': 'application/json',
'Accept': 'application/json'}
self.profiles_url = '/rest/profiles'
self.account_client = account.AccountClient(connection)
def _get(self, url, query_params=None, payload=None):
self.account_client.login()
r = self.connection.get(url, self.headers, query_params, payload)
self.account_client.logout()
return r
def _post(self, url, payload=None, query_params=None):
self.account_client.login()
r = self.connection.post(url, payload, self.headers, query_params)
self.account_client.logout()
return r
def _put(self, url, payload=None):
self.account_client.login()
r = self.connection.put(url, payload, self.headers)
self.account_client.logout()
return r
def _delete(self, url):
self.account_client.login()
r = self.connection.delete(url, self.headers)
self.account_client.logout()
return r
@pki.handle_exceptions()
[docs] def list_profiles(self, start=None, size=None):
"""
Fetches the list of profiles.
The start and size arguments provide pagination support.
Returns a ProfileDataInfoCollection object.
"""
query_params = {
'start': start,
'size': size
}
r = self._get(self.profiles_url, query_params)
return ProfileDataInfoCollection.from_json(r.json())
@pki.handle_exceptions()
[docs] def get_profile(self, profile_id):
"""
Fetches information for the profile for the given profile id.
Returns a ProfileData object.
"""
if profile_id is None:
raise ValueError("Profile ID must be specified.")
url = self.profiles_url + '/' + str(profile_id)
r = self._get(url)
return Profile.from_json(r.json())
def _modify_profile_state(self, profile_id, action):
"""
Internal method used to modify the profile state.
"""
if profile_id is None:
raise ValueError("Profile ID must be specified.")
if action is None:
raise ValueError("A valid action(enable/disable) must be "
"specified.")
url = self.profiles_url + '/' + str(profile_id)
params = {'action': action}
self._post(url, query_params=params)
@pki.handle_exceptions()
[docs] def enable_profile(self, profile_id):
"""
Enables a profile.
"""
return self._modify_profile_state(profile_id, 'enable')
@pki.handle_exceptions()
[docs] def disable_profile(self, profile_id):
"""
Disables a profile.
"""
return self._modify_profile_state(profile_id, 'disable')
def _send_profile_create(self, profile_data):
if profile_data is None:
raise ValueError("No ProfileData specified")
profile_object = json.dumps(profile_data, cls=encoder.CustomTypeEncoder,
sort_keys=True)
r = self._post(self.profiles_url, profile_object)
return Profile.from_json(r.json())
def _send_profile_modify(self, profile_data):
if profile_data is None:
raise ValueError("No ProfileData specified")
if profile_data.profile_id is None:
raise ValueError("Profile Id is not specified.")
profile_object = json.dumps(profile_data, cls=encoder.CustomTypeEncoder,
sort_keys=True)
url = self.profiles_url + '/' + str(profile_data.profile_id)
r = self._put(url, profile_object)
return Profile.from_json(r.json())
@pki.handle_exceptions()
[docs] def create_profile(self, profile_data):
"""
Create a new profile for the given Profile object.
"""
return self._send_profile_create(profile_data)
@pki.handle_exceptions()
[docs] def modify_profile(self, profile_data):
"""
Modify an existing profile with the given Profile object.
"""
return self._send_profile_modify(profile_data)
[docs] def create_profile_from_file(self, path_to_file):
"""
Reads the file for the serialized Profile object.
Performs the profile create operation.
Currently supports only data format in json.
"""
profile_data = Profile.get_profile_data_from_file(path_to_file)
return self._send_profile_create(profile_data)
[docs] def modify_profile_from_file(self, path_to_file):
"""
Reads the file for the serialized Profile object.
Performs the profile modify operation.
Currently supports only data format in json.
"""
profile_data = Profile.get_profile_data_from_file(path_to_file)
return self._send_profile_modify(profile_data)
@pki.handle_exceptions()
[docs] def delete_profile(self, profile_id):
"""
Delete a profile with the given Profile Id.
"""
if profile_id is None:
raise ValueError("Profile Id must be specified.")
url = self.profiles_url + '/' + str(profile_id)
r = self._delete(url)
return r
encoder.NOTYPES['Profile'] = Profile
encoder.NOTYPES['ProfileInput'] = ProfileInput
encoder.NOTYPES['ProfileOutput'] = ProfileOutput
encoder.NOTYPES['ProfileAttribute'] = ProfileAttribute
encoder.NOTYPES['Descriptor'] = Descriptor
encoder.NOTYPES['PolicySetList'] = PolicySetList
encoder.NOTYPES['PolicySet'] = PolicySet
encoder.NOTYPES['ProfilePolicy'] = ProfilePolicy
encoder.NOTYPES['PolicyDefault'] = PolicyDefault
encoder.NOTYPES['PolicyConstraint'] = PolicyConstraint
encoder.NOTYPES['ProfileParameter'] = ProfileParameter
encoder.NOTYPES['PolicyConstraintValue'] = PolicyConstraintValue
encoder.NOTYPES['Link'] = pki.Link
[docs]def main():
# Initialize a PKIConnection object for the CA
connection = client.PKIConnection('https', 'localhost', '8443', 'ca')
# The pem file used for authentication. Created from a p12 file using the
# command -
# openssl pkcs12 -in <p12_file_path> -out /tmp/auth.pem -nodes
connection.set_authentication_cert("/tmp/auth.pem")
# Initialize the ProfileClient class
profile_client = ProfileClient(connection)
# Folder to store the files generated during test
file_path = '/tmp/profile_client_test/'
if not os.path.exists(file_path):
os.makedirs(file_path)
# Fetching a list of profiles
profile_data_infos = profile_client.list_profiles()
print('List of profiles:')
print('-----------------')
for profile_data_info in profile_data_infos:
print(' Profile ID: ' + profile_data_info.profile_id)
print(' Profile Name: ' + profile_data_info.profile_name)
print(' Profile Description: ' + profile_data_info.profile_description)
print()
# Get a specific profile
profile_data = profile_client.get_profile('caUserCert')
print('Profile Data for caUserCert:')
print('----------------------------')
print(' Profile ID: ' + profile_data.profile_id)
print(' Profile Name: ' + profile_data.name)
print(' Profile Description: ' + profile_data.description)
print(' Is profile enabled? ' + str(profile_data.enabled))
print(' Is profile visible? ' + str(profile_data.visible))
print()
# Disabling a profile
print('Disabling a profile:')
print('--------------------')
profile_client.disable_profile('caUserCert')
profile = profile_client.get_profile('caUserCert')
print(' Profile ID: ' + profile.profile_id)
print(' Is profile enabled? ' + str(profile.enabled))
print()
# Disabling a profile
print('Enabling a profile:')
print('-------------------')
profile_client.enable_profile('caUserCert')
profile = profile_client.get_profile('caUserCert')
print(' Profile ID: ' + profile_data.profile_id)
print(' Is profile enabled? ' + str(profile.enabled))
print()
# profile_client.delete_profile('MySampleProfile')
# Create a new sample profile
print('Creating a new profile:')
print('-----------------------')
profile_data = Profile(name="My Sample User Cert Enrollment",
profile_id="MySampleProfile",
class_id="caEnrollImpl",
description="Example User Cert Enroll Impl",
enabled_by='admin', enabled=False, visible=False,
renewal=False, xml_output=False,
authorization_acl="")
# Adding a profile input
profile_input = ProfileInput("i1", "subjectNameInputImpl")
profile_input.add_attribute(ProfileAttribute("sn_uid"))
profile_input.add_attribute(ProfileAttribute("sn_e"))
profile_input.add_attribute(ProfileAttribute("sn_c"))
profile_input.add_attribute(ProfileAttribute("sn_ou"))
profile_input.add_attribute(ProfileAttribute("sn_ou1"))
profile_input.add_attribute(ProfileAttribute("sn_ou2"))
profile_input.add_attribute(ProfileAttribute("sn_ou3"))
profile_input.add_attribute(ProfileAttribute("sn_cn"))
profile_input.add_attribute(ProfileAttribute("sn_o"))
profile_data.add_input(profile_input)
# Adding a profile output
profile_output = ProfileOutput("o1", name="Certificate Output",
class_id="certOutputImpl")
profile_output.add_attribute(ProfileAttribute("pretty_cert"))
profile_output.add_attribute(ProfileAttribute("b64_cert"))
profile_data.add_output(profile_output)
# Create a Policy set with a list of profile policies
policy_list = []
# Creating profile policy
policy_default = PolicyDefault("Subject Name Default",
"userSubjectNameDefaultImpl",
"This default populates a User-Supplied "
"Certificate Subject Name to the request.")
attr_descriptor = Descriptor(syntax="string", description="Subject Name")
policy_attribute = ProfileAttribute("name", descriptor=attr_descriptor)
policy_default.add_attribute(policy_attribute)
policy_constraint = PolicyConstraint("Subject Name Constraint",
"This constraint accepts the subject "
"name that matches UID=.*",
"subjectNameConstraintImpl")
constraint_descriptor = Descriptor(syntax="string",
description="Subject Name Pattern")
policy_constraint_value = PolicyConstraintValue("pattern",
"UID=.*",
constraint_descriptor)
policy_constraint.add_constraint_value(policy_constraint_value)
policy_list.append(ProfilePolicy("1", policy_default, policy_constraint))
# Creating another profile policy
# Defining the policy default
policy_default = PolicyDefault("Validity Default", "validityDefaultImpl",
"This default populates a Certificate "
"Validity to the request. The default "
"values are Range=180 in days")
attr_descriptor = Descriptor(syntax="string", description="Not Before")
policy_attribute = ProfileAttribute(
"notBefore",
descriptor=attr_descriptor)
policy_default.add_attribute(policy_attribute)
attr_descriptor = Descriptor(syntax="string", description="Not After")
policy_attribute = ProfileAttribute("notAfter", descriptor=attr_descriptor)
policy_default.add_attribute(policy_attribute)
profile_param = ProfileParameter("range", 180)
profile_param2 = ProfileParameter("startTime", 0)
policy_default.add_parameter(profile_param)
policy_default.add_parameter(profile_param2)
# Defining the policy constraint
policy_constraint = PolicyConstraint("Validity Constraint",
"This constraint rejects the validity "
"that is not between 365 days.",
"validityConstraintImpl")
constraint_descriptor = Descriptor(syntax="integer",
description="Validity Range (in days)",
default_value=365)
policy_constraint_value = PolicyConstraintValue("range", 365,
constraint_descriptor)
policy_constraint.add_constraint_value(policy_constraint_value)
constraint_descriptor = Descriptor(syntax="boolean", default_value=False,
description="Check Not Before against"
" current time")
policy_constraint_value = PolicyConstraintValue("notBeforeCheck", False,
constraint_descriptor)
policy_constraint.add_constraint_value(policy_constraint_value)
constraint_descriptor = Descriptor(syntax="boolean", default_value=False,
description="Check Not After against"
" Not Before")
policy_constraint_value = PolicyConstraintValue("notAfterCheck", False,
constraint_descriptor)
policy_constraint.add_constraint_value(policy_constraint_value)
policy_list.append(ProfilePolicy("2", policy_default, policy_constraint))
policy_set = PolicySet("userCertSet", policy_list)
profile_data.add_policy_set(policy_set)
# Write the profile data object to a file for testing a file input
with open(file_path + '/original.json', 'w') as output_file:
output_file.write(json.dumps(profile_data,
cls=encoder.CustomTypeEncoder,
sort_keys=True, indent=4))
# Create a new profile
created_profile = profile_client.create_profile(profile_data)
print(created_profile)
print()
# Test creating a new profile with a duplicate profile id
print("Create a profile with duplicate profile id.")
print("-------------------------------------------")
try:
profile_data = Profile(name="My Sample User Cert Enrollment",
profile_id="MySampleProfile",
class_id="caEnrollImpl",
description="Example User Cert Enroll Impl",
enabled_by='admin', enabled=False, visible=False,
renewal=False, xml_output=False,
authorization_acl="")
profile_input = ProfileInput("i1", "subjectNameInputImpl")
profile_input.add_attribute(ProfileAttribute("sn_uid"))
profile_input.add_attribute(ProfileAttribute("sn_e"))
profile_input.add_attribute(ProfileAttribute("sn_c"))
profile_input.add_attribute(ProfileAttribute("sn_ou"))
profile_input.add_attribute(ProfileAttribute("sn_ou1"))
profile_input.add_attribute(ProfileAttribute("sn_ou2"))
profile_input.add_attribute(ProfileAttribute("sn_ou3"))
profile_input.add_attribute(ProfileAttribute("sn_cn"))
profile_input.add_attribute(ProfileAttribute("sn_o"))
profile_data.add_input(profile_input)
profile_client.create_profile(profile_data)
# pylint: disable=W0703
except pki.BadRequestException as e:
print('MySampleProfile ' + str(e))
print()
# Modify the above created profile
print('Modifying the profile MySampleProfile.')
print('-----------------------------------')
fetch = profile_client.get_profile('MySampleProfile')
profile_input2 = ProfileInput("i2", "keyGenInputImpl")
profile_input2.add_attribute(ProfileAttribute("cert_request_type"))
profile_input2.add_attribute(ProfileAttribute("cert_request"))
fetch.add_input(profile_input2)
fetch.name += " (Modified)"
modified_profile = profile_client.modify_profile(fetch)
with open(file_path + 'modified.json', 'w') as output_file:
output_file.write(json.dumps(fetch, cls=encoder.CustomTypeEncoder,
sort_keys=True, indent=4))
print(modified_profile)
print()
# Delete a profile
print("Deleting the profile MySampleProfile.")
print("----------------------------------")
profile_client.delete_profile('MySampleProfile')
print("Deleted profile MySampleProfile.")
print()
# Testing deletion of a profile
print('Test profile deletion.')
print('----------------------')
try:
profile_client.get_profile('MySampleProfile')
# pylint: disable=W0703
except pki.ProfileNotFoundException as e:
print(str(e))
print()
# Creating a profile from file
print('Creating a profile using file input.')
print('------------------------------------')
original = profile_client.create_profile_from_file(
file_path + 'original.json')
print(original)
print()
# Modifying a profile from file
print('Modifying a profile using file input.')
print('------------------------------------')
modified = profile_client.modify_profile_from_file(
file_path + 'modified.json')
print(modified)
print()
# Test clean up
profile_client.delete_profile('MySampleProfile')
os.remove(file_path + 'original.json')
os.remove(file_path + 'modified.json')
os.removedirs(file_path)
if __name__ == "__main__":
main()