import os
from distutils.dir_util import remove_tree
from tempfile import mkdtemp as mkdtemp_orig, gettempdir
from shutil import Error, copy2, copystat, rmtree
[docs]def is_subdir(path, base_dir, dereference_symlinks=True):
''' Determines if the path is in the base_dir
Parameters
----------
path : str
The Path
base_dir : str
The Base Directory
Returns
-------
tuple
containing (True/False, and remainder (relative) of path)
str
Remainder, the relative different between the two paths. If on Windows
and the paths are on two different drives, the entire path is returned
Note
----
This will NOT work with Junctions in Windows.
'''
if dereference_symlinks:
path = os.path.realpath(path)
base_dir = os.path.realpath(base_dir)
path = os.path.normcase(os.path.normpath(os.path.abspath(path)))
base_dir = os.path.normcase(os.path.normpath(os.path.abspath(base_dir)))
try:
relative = os.path.relpath(path, base_dir)
except ValueError:
return (False, path)
if relative.startswith(os.pardir):
return (False, relative)
return (True, relative)
[docs]def mkdtemp(*args, **kwargs):
''' Version of tempfile.mkdtemp that is r/w by uid and gid
Parameters
----------
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
Returns
-------
str
The Temp Directory
'''
tempdir = mkdtemp_orig(*args, **kwargs)
os.chmod(tempdir, 0o770)
return tempdir
[docs]class Chdir(object):
''' Context to change dir and guarantee you get back to your last directory
Example::
These are written in doctest format, and should illustrate how to
use the function.
>>> a=[1,2,3]
>>> print [x + 3 for x in a]
[4, 5, 6]
>>> import os
>>> import tempfile
>>> from vsi.tools.dir_util import Chdir
>>> os.chdir(os.path.abspath(os.sep))
>>> print(os.getcwd())
/
>>> with Chdir(tempfile.tempdir):
... print(os.getcwd())
/tmp
>>> print(os.getcwd())
/
'''
def __init__(self, dir, create=False, error_on_exit=False):
self.dir = dir
self.oldDir = None
self.create = create
self.error_on_exit = error_on_exit
''' Create Chdir object
Parameters
----------
dir : str
the directory you want to change directory to
create : str
Optional: Create the directory if it doesn't exist (else os error 2
will occur) Default: False
error_on_exit : bool
When exiting the with loop, if the return directory does not exist
anymore, (OSError 2), if this is true an error is thrown, else it
is ignore and you are left in the new directory. Default: False
'''
def __enter__(self):
self.oldDir = os.getcwd()
if self.create and not os.path.exists(self.dir):
os.makedirs(self.dir)
os.chdir(self.dir)
[docs] def __exit__(self, exc_type, exc_value, traceback):
''' Exits if the previous directory no longer exists.
Parameters
----------
exc_type : str
The Execption Type
exc_value : float
The Exception Value
traceback : str
The Traceback
'''
try:
os.chdir(self.oldDir)
except OSError as osError:
if osError.errno == 2 and self.error_on_exit:
print("Previous directory does not exist anymore.\nCannot return to {}"
.format(self.oldDir))
#Return you to / or the c:\ or d:\, etc... If THIS errors, something
#is seriously WRONG
os.chdir(os.path.abspath(os.sep))
else:
raise(osError)
# Deprecated: Python3 tempfile.TemporaryDirectory
# class TempDir(object):
# ''' Create and clean up a temp directory
# The most common usage for this would be to create a random directory
# each time. The mkdtemp flag was added to facilitate this.
# Example:
# >>> from vsi.tools.dir_util import TempDir
# >>> import tempfile
# >>> from glob import glob
# >>> with TempDir(tempfile.tempdir, mkdtemp=True) as tempDir:
# ... print(tempDir)
# ... print(glob(tempDir))
# >>> print(glob(tempDir))
# '''
# def __init__(self, dir=None, cd=False, delete=True, mkdtemp=False,
# delete_if_not_create=False, *args, **kwargs):
# self.base_dir= dir
# self.mkdtemp = mkdtemp
# ''' Create a temp dir object
# Parameters
# ----------
# dir : str
# Base dir must exist. Default is None. None will only work if
# mkdtemp is true.
# cd : bool
# Change into the temp dir (and change back). Default: false
# mkdtemp : array_like
# Call mkdtemp in dir to create a temp dir. Default: false
# | Try and use this INSTEAD of delete_if_not_create, much safer.
# The class must call mkdtemp for you, or else it will not have
# "created" the directory, thus introducing unsafe situations where
# a directory may be inadvertently deleted if you accidentally tel
# it to be.
# delete : bool
# Delete on exit. Default: true
# delete_if_not_create : bool
# Default: false. Delete the directory "dir" even if it was not created
# by TempDir. ONLY TURN THIS ON when you are absolutely sure you want
# this directory deleted. Make sure the input dir is something you
# want deleted, or else just don't use this argument! For example, in
# an unsanitized case the tempdir could be / or something like
# /home/username. Then / or the entire home directory would be
# deleted! That has to be bad, right?
# *args
# Variable length argument list.
# **kwargs
# Passed on to Chdir (mostly for error_on_exit)
# '''
# if self.mkdtemp and not self.base_dir:
# self.base_dir = gettempdir()
# self.cd = Chdir(self.base_dir, *args, **kwargs) if cd else None
# self.delete = delete
# self.delete_if_not_create=delete_if_not_create
# self.created_dir = False
# def __enter__(self):
# if self.mkdtemp:
# if not os.path.exists(self.base_dir):
# os.makedirs(self.base_dir)
# self.dir = mkdtemp(dir=self.base_dir)
# if self.cd:
# self.cd.dir = self.dir #<-- hack
# self.created_dir = True
# else:
# #These two are separate variables incase with is called twice on the
# #same instance... It COULD happen
# self.dir = self.base_dir
# if not os.path.exists(self.dir):
# os.makedirs(self.dir)
# self.created_dir = True
# if self.cd:
# self.cd.__enter__()
# return self.dir
# def __exit__(self, exc_type, exc_value, traceback):
# ''' Exits if the previous directory no longer exists.
# Parameters
# ----------
# exc_type : str
# The Execption Type
# exc_value : float
# The Exception Value
# traceback : str
# The Traceback
# '''
# if self.cd:
# self.cd.__exit__(exc_type, exc_value, traceback)
# #The reason for not making this all automatically work is JUST IN
# #CASE someone says something like / is the temp dir... Can you
# #Imagine running remove tree on / or /home/user? BAD THINGS.
# #It is up to the dev to make sure he knows what he is doing
# if self.delete and (self.created_dir or self.delete_if_not_create):
# remove_tree(self.dir)
[docs]def checksum_dir(checksum, checksum_depth=2, base_dir=None):
''' Generate checksum directory name
Parameters
---------
checksum : float
The Checksum
checksum_depth : int
The Checksum Depth
base_dir : str
The base Directory
Returns
------
str
The Path
Example
-------
If the checksum was 1234567890abcdef, and the depth was 3, you would get
12/34/56/1234567890abcdef
Note
----
An optional base_dir will be prefixed
'''
dir_parts = [checksum[x:x+2] if x<checksum_depth*2 else checksum \
for x in range(0,checksum_depth*2+1,2)]
if base_dir:
return os.path.join(base_dir, *dir_parts)
else:
return os.path.join(*dir_parts)
#copy from shutil actually
[docs]def copytree(src, dst, symlinks=False, ignore=None):
"""Recursively copy a directory tree using copy2().
Parameters
----------
src : :term:`path-like object`
The Source Tree
dst : str
The Destination Directory may not already exist. If exception(s) occur,
an Error is raised with a list of reasons. If the destination
directory exists, it will be clobber by the source, including replacing
entire directory trees with symlinks, if the symlinks flags is true.
symlinks : bool
Optional. True if the symbolic links in the source tree result in
symbolic links in the destination tree. False if the contents of the
files pointed to by symbolic links are copied.
ignore : :term:`function`
The optional ignore argument is a callable. If given, it is called with
the `src` parameter, which is the directory being visited by
copytree(), and `names` which is the list of `src` contents, as
returned by os.listdir():
callable(src, names) -> ignored_names
Since copytree() is called recursively, the callable will be called
once for each directory that is copied. It returns a list of names
relative to the `src` directory that should not be copied.
XXX Consider this example code rather than the ultimate tool.
"""
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set()
if not os.path.exists(dst):
os.makedirs(dst)
else:
os.chmod(dst, 0o777)
errors = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if symlinks and os.path.islink(srcname):
linkto = os.readlink(srcname)
if os.path.exists(dstname) and os.path.isdir(dstname):
rmtree(dstname)
else:
os.unlink(dstname)
os.symlink(linkto, dstname)
elif os.path.isdir(srcname):
copytree(srcname, dstname, symlinks, ignore)
else:
if os.path.exists(dstname):
os.unlink(dstname)
# Will raise a SpecialFileError for unsupported file types
copy2(srcname, dstname)
# catch the Error from the recursive copytree so that we can
# continue with other files
except Error as err:
errors.extend(err.args[0])
except EnvironmentError as why:
errors.append((srcname, dstname, str(why)))
try:
copystat(src, dst)
except OSError as why:
if WindowsError is not None and isinstance(why, WindowsError):
# Copying file access times may fail on Windows
pass
else:
errors.append((src, dst, str(why)))
if errors:
raise Error(errors)
[docs]def root_dir(directory):
''' OS independent way of getting the root directory of a directory
Parameters
----------
directory : str
The Directory
Returns
-------
str
The Root Directory
On Windows::
C:\\tmp\\blah.txt -> C:\\
D:\\Program Files\\calc.exe -> D:\\
On Linux/Darwin::
/tmp/blah.txt -> /
'''
return os.path.splitdrive(directory)[0] + os.sep
[docs]def samefile(path1, path2, normpath=True):
''' OS independent version of os.path.samefile
Parameters
----------
path1 : str
The File Name
path2 : str
The File Name
normpath : bool
Optional normpath=True. In posix cases when you want symlinks to be
followed instead of normalized out, this would be useful to set to
false.
Returns
-------
str
OS independent version of the path
Example:
>>> os.symlink('/usr/bin', '/tmp/blah')
>>> samefile('/usr', '/tmp/blah/..')
False
>>> samefile('/usr', '/tmp/blah/..', normpath=False)
True
'''
if normpath:
path1 = os.path.normpath(path1)
path2 = os.path.normpath(path2)
if hasattr(os.path, 'samefile'):
return os.path.samefile(path1, path2)
else:
return os.path.realpath(path1) == os.path.realpath(path2)
[docs]def prune_dir(directory, top_dir=None):
''' Remove directory and ancestor directories if they are empty
Parameters
----------
directory : str
The Directory
top_dir : str
Optional argument top_dir, prevents pruning below that directory
Raises
------
OSError
'''
#Fill out default value
if top_dir is None:
top_dir = root_dir(directory)
#Sanitize inputs, and clean up so == below works
directory = os.path.normpath(directory)
top_dir = os.path.normpath(top_dir)
#Sanity checks
assert os.path.isdir(directory)
assert os.path.isdir(top_dir)
assert is_subdir(directory, top_dir)
for x in range(100):
#prevent infinite loop, not that I think that's possible
if samefile(directory, top_dir):
#If reached top dir, stop!
break
try:
os.rmdir(directory)
except OSError as err:
if err.errno==39 or err.errno == 13:
break
raise err
directory = os.path.dirname(directory)
[docs]def find_file_in_path(fname, path=None):
"""
Find a file in any of the directories on the PATH.
Like distutils.spawn.find_executable, but works for
any file.
Parameters
----------
fname : str
The filename to search the PATH for.
path : str
An optional PATH string. If not provided, os PATH is used.
Returns
-------
str
The full path to the found file, or None if not found.
"""
if not path:
try:
path = os.environ['PATH']
except KeyError:
return None
for p in path.split(os.pathsep):
possible_file_location = os.path.join(p, fname)
if os.path.exists(possible_file_location):
return possible_file_location
return None
[docs]def is_dir_empty(path):
'''
Checks to see if a dir is empty. Much faster than ``os.listdir`` and
``os.walk``
'''
with os.scandir(path) as scandir:
for _ in scandir:
return False
return True