# Authors:
# Endi S. Dewata <edewata@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright (C) 2013 Red Hat, Inc.
# All rights reserved.
#
"""
Module containing utility functions and classes for the Dogtag python code
"""
from __future__ import absolute_import
import functools
import os
import re
import shutil
from shutil import Error
try:
from shutil import WindowsError # pylint: disable=E0611
except ImportError:
WindowsError = None
import six
import subprocess
DEFAULT_PKI_ENV_LIST = [
'/usr/share/pki/etc/pki.conf',
'/etc/pki/pki.conf',
]
[docs]def copy(source, dest):
"""
Copy a file or a folder and its contents.
"""
# remove trailing slashes
if source[-1] == '/':
source = source[:-1]
if dest[-1] == '/':
dest = dest[:-1]
sourceparent = os.path.dirname(source)
destparent = os.path.dirname(dest)
copydirs(sourceparent, destparent)
if os.path.isfile(source):
copyfile(source, dest)
else:
for sourcepath, _, filenames in os.walk(source):
relpath = sourcepath[len(source):]
destpath = dest + relpath
if destpath == '':
destpath = '/'
copydirs(sourcepath, destpath)
for filename in filenames:
sourcefile = os.path.join(sourcepath, filename)
targetfile = os.path.join(destpath, filename)
copyfile(sourcefile, targetfile)
[docs]def copyfile(source, dest, overwrite=True):
"""
Copy a file or link while preserving its attributes.
"""
# if dest already exists and not overwriting, do nothing
if os.path.exists(dest) and not overwrite:
return
if os.path.islink(source):
target = os.readlink(source)
os.symlink(target, dest)
stat = os.lstat(source)
os.lchown(dest, stat.st_uid, stat.st_gid)
else:
shutil.copyfile(source, dest)
stat = os.stat(source)
os.utime(dest, (stat.st_atime, stat.st_mtime))
os.chmod(dest, stat.st_mode)
os.chown(dest, stat.st_uid, stat.st_gid)
[docs]def copydirs(source, dest):
"""
Copy a folder and its parents while preserving their attributes.
"""
if os.path.exists(dest):
return
destparent = os.path.dirname(dest)
if not os.path.exists(destparent):
sourceparent = os.path.dirname(source)
copydirs(sourceparent, destparent)
os.mkdir(dest)
stat = os.stat(source)
os.utime(dest, (stat.st_atime, stat.st_mtime))
os.chmod(dest, stat.st_mode)
os.chown(dest, stat.st_uid, stat.st_gid)
[docs]def chown(path, uid, gid):
"""
Change ownership of a file or folder recursively.
"""
os.chown(path, uid, gid)
if not os.path.isdir(path):
return
for item in os.listdir(path):
itempath = os.path.join(path, item)
if os.path.isfile(itempath):
os.chown(itempath, uid, gid)
elif os.path.isdir(itempath):
chown(itempath, uid, gid)
[docs]def chmod(path, perms):
"""
Change permissions of a file or folder recursively.
"""
os.chmod(path, perms)
if not os.path.isdir(path):
return
for item in os.listdir(path):
itempath = os.path.join(path, item)
if os.path.isfile(itempath):
os.chmod(itempath, perms)
elif os.path.isdir(itempath):
chmod(itempath, perms)
[docs]def customize_file(input_file, output_file, params):
"""
Customize a file with specified parameters.
"""
with open(input_file) as infile, open(output_file, 'w') as outfile:
for line in infile:
for src, target in params.items():
line = line.replace(src, target)
outfile.write(line)
[docs]def load_properties(filename, properties):
with open(filename) as f:
lines = f.read().splitlines()
name = None
multi_line = False
for index, line in enumerate(lines):
if multi_line:
# append line to previous property
value = properties[name]
value = value + line
else:
# parse line for new property
line = line.lstrip()
if not line or line.startswith('#'):
continue
parts = line.split('=', 1)
if len(parts) < 2:
raise Exception('Missing delimiter in %s line %d' %
(filename, index + 1))
name = parts[0].rstrip()
value = parts[1].lstrip()
# check if the value is multi-line
if value.endswith('\\'):
value = value[:-1]
multi_line = True
else:
value = value.rstrip()
multi_line = False
# store value in properties
properties[name] = value
[docs]def store_properties(filename, properties):
with open(filename, 'w') as f:
for name, value in properties.items():
line = '%s=%s\n' % (name, value)
f.write(line)
[docs]def copytree(src, dst, symlinks=False, ignore=None):
"""
Recursively copy a directory tree using copy2().
PATCH: This code was copied from 'shutil.py' and patched to
allow 'The destination directory to already exist.'
If exception(s) occur, an Error is raised with a list of reasons.
If the optional symlinks flag is true, symbolic links in the
source tree result in symbolic links in the destination tree; if
it is false, the contents of the files pointed to by symbolic
links are copied.
The optional ignore argument is a callable. If given, it
is called with the `src` parameter, which is the directory
being visited by copytree(), and `names` which is the list of
`src` contents, as returned by os.listdir():
callable(src, names) -> ignored_names
Since copytree() is called recursively, the callable will be
called once for each directory that is copied. It returns a
list of names relative to the `src` directory that should
not be copied.
Consider this example code rather than the ultimate tool.
"""
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set()
# PATCH: ONLY execute 'os.makedirs(dst)' if the top-level
# destination directory does NOT exist!
if not os.path.exists(dst):
os.makedirs(dst)
errors = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if symlinks and os.path.islink(srcname):
linkto = os.readlink(srcname)
os.symlink(linkto, dstname)
elif os.path.isdir(srcname):
copytree(srcname, dstname, symlinks, ignore)
else:
# Will raise a SpecialFileError for unsupported file types
shutil.copy2(srcname, dstname)
# catch the Error from the recursive copytree so that we can
# continue with other files
except Error as err:
errors.extend(err.args[0])
except EnvironmentError as why:
errors.append((srcname, dstname, str(why)))
try:
shutil.copystat(src, dst)
except OSError as why:
if WindowsError is not None and isinstance(why, WindowsError):
# Copying file access times may fail on Windows
pass
else:
errors.extend((src, dst, str(why)))
if errors:
raise Error(errors)
[docs]def read_environment_files(env_file_list=None):
if env_file_list is None:
env_file_list = DEFAULT_PKI_ENV_LIST
file_command = ' && '.join(
'source {}'.format(env_file) for env_file in env_file_list)
file_command += ' && env'
command = [
'bash',
'-c',
file_command
]
env_vals = subprocess.check_output(command).decode('utf-8').split('\n')
for env_val in env_vals:
(key, _, value) = env_val.partition("=")
if not key.strip() or key == u'_':
continue
os.environ[key] = value
@functools.total_ordering
[docs]class Version(object):
def __init__(self, obj):
if isinstance(obj, six.string_types):
# parse <major>.<minor>.<patch>[<suffix>]
match = re.match(r'^(\d+)\.(\d+)\.(\d+)', obj)
if match is None:
raise Exception('Unable to parse version number: %s' % obj)
self.major = int(match.group(1))
self.minor = int(match.group(2))
self.patch = int(match.group(3))
elif isinstance(obj, Version):
self.major = obj.major
self.minor = obj.minor
self.patch = obj.patch
else:
raise Exception('Unsupported version type: %s' % type(obj))
# release is ignored in comparisons
def __eq__(self, other):
return (self.major == other.major and
self.minor == other.minor and
self.patch == other.patch)
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
if self.major < other.major:
return True
if self.major == other.major and self.minor < other.minor:
return True
if (self.major == other.major and
self.minor == other.minor and
self.patch < other.patch):
return True
return False
def __gt__(self, other):
return not self.__lt__(other) and not self.__eq__(other)
# not hashable
__hash__ = None
def __repr__(self):
return '%d.%d.%d' % (self.major, self.minor, self.patch)