diff options
Diffstat (limited to 'cvs2svn_lib/common.py')
-rw-r--r-- | cvs2svn_lib/common.py | 409 |
1 files changed, 409 insertions, 0 deletions
diff --git a/cvs2svn_lib/common.py b/cvs2svn_lib/common.py new file mode 100644 index 0000000..8400907 --- /dev/null +++ b/cvs2svn_lib/common.py @@ -0,0 +1,409 @@ +# (Be in -*- python -*- mode.) +# +# ==================================================================== +# Copyright (c) 2000-2009 CollabNet. All rights reserved. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at http://subversion.tigris.org/license-1.html. +# If newer versions of this license are posted there, you may use a +# newer version instead, at your option. +# +# This software consists of voluntary contributions made by many +# individuals. For exact contribution history, see the revision +# history and logs, available at http://cvs2svn.tigris.org/. +# ==================================================================== + +"""This module contains common facilities used by cvs2svn.""" + + +import re +import time +import codecs + +from cvs2svn_lib.log import Log + + +# Always use these constants for opening databases. +DB_OPEN_READ = 'r' +DB_OPEN_WRITE = 'w' +DB_OPEN_NEW = 'n' + + +SVN_INVALID_REVNUM = -1 + + +# Warnings and errors start with these strings. They are typically +# followed by a colon and a space, as in "%s: " ==> "WARNING: ". +warning_prefix = "WARNING" +error_prefix = "ERROR" + + +class FatalException(Exception): + """Exception thrown on a non-recoverable error. + + If this exception is thrown by main(), it is caught by the global + layer of the program, its string representation is printed (followed + by a newline), and the program is ended with an exit code of 1.""" + + pass + + +class InternalError(Exception): + """Exception thrown in the case of a cvs2svn internal error (aka, bug).""" + + pass + + +class FatalError(FatalException): + """A FatalException that prepends error_prefix to the message.""" + + def __init__(self, msg): + """Use (error_prefix + ': ' + MSG) as the error message.""" + + FatalException.__init__(self, '%s: %s' % (error_prefix, msg,)) + + +class CommandError(FatalError): + """A FatalError caused by a failed command invocation. + + The error message includes the command name, exit code, and output.""" + + def __init__(self, command, exit_status, error_output=''): + self.command = command + self.exit_status = exit_status + self.error_output = error_output + if error_output.rstrip(): + FatalError.__init__( + self, + 'The command %r failed with exit status=%s\n' + 'and the following output:\n' + '%s' + % (self.command, self.exit_status, self.error_output.rstrip())) + else: + FatalError.__init__( + self, + 'The command %r failed with exit status=%s and no output' + % (self.command, self.exit_status)) + + +def path_join(*components): + """Join two or more pathname COMPONENTS, inserting '/' as needed. + Empty component are skipped.""" + + return '/'.join(filter(None, components)) + + +def path_split(path): + """Split the svn pathname PATH into a pair, (HEAD, TAIL). + + This is similar to os.path.split(), but always uses '/' as path + separator. PATH is an svn path, which should not start with a '/'. + HEAD is everything before the last slash, and TAIL is everything + after. If PATH ends in a slash, TAIL will be empty. If there is no + slash in PATH, HEAD will be empty. If PATH is empty, both HEAD and + TAIL are empty.""" + + pos = path.rfind('/') + if pos == -1: + return ('', path,) + else: + return (path[:pos], path[pos+1:],) + + +class IllegalSVNPathError(FatalException): + pass + + +# Control characters (characters not allowed in Subversion filenames): +ctrl_characters_regexp = re.compile('[\\\x00-\\\x1f\\\x7f]') + + +def verify_svn_filename_legal(filename): + """Verify that FILENAME is a legal filename. + + FILENAME is a path component of a CVS path. Check that it won't + choke SVN: + + - Check that it is not empty. + + - Check that it is not equal to '.' or '..'. + + - Check that the filename does not include any control characters. + + If any of these tests fail, raise an IllegalSVNPathError.""" + + if filename == '': + raise IllegalSVNPathError("Empty filename component.") + + if filename in ['.', '..']: + raise IllegalSVNPathError("Illegal filename component %r." % (filename,)) + + m = ctrl_characters_regexp.search(filename) + if m: + raise IllegalSVNPathError( + "Character %r in filename %r is not supported by Subversion." + % (m.group(), filename,) + ) + + +def verify_svn_path_legal(path): + """Verify that PATH is a legitimate SVN path. + + If not, raise an IllegalSVNPathError.""" + + if path.startswith('/'): + raise IllegalSVNPathError("Path %r must not start with '/'." % (path,)) + head = path + while head != '': + (head,tail) = path_split(head) + try: + verify_svn_filename_legal(tail) + except IllegalSVNPathError, e: + raise IllegalSVNPathError('Problem with path %r: %s' % (path, e,)) + + +def normalize_svn_path(path, allow_empty=False): + """Normalize an SVN path (e.g., one supplied by a user). + + 1. Strip leading, trailing, and duplicated '/'. + 2. If ALLOW_EMPTY is not set, verify that PATH is not empty. + + Return the normalized path. + + If the path is invalid, raise an IllegalSVNPathError.""" + + norm_path = path_join(*path.split('/')) + if not allow_empty and not norm_path: + raise IllegalSVNPathError("Path is empty") + return norm_path + + +class PathRepeatedException(Exception): + def __init__(self, path, count): + self.path = path + self.count = count + Exception.__init__( + self, 'Path %s is repeated %d times' % (self.path, self.count,) + ) + + +class PathsNestedException(Exception): + def __init__(self, nest, nestlings): + self.nest = nest + self.nestlings = nestlings + Exception.__init__( + self, + 'Path %s contains the following other paths: %s' + % (self.nest, ', '.join(self.nestlings),) + ) + + +class PathsNotDisjointException(FatalException): + """An exception that collects multiple other disjointness exceptions.""" + + def __init__(self, problems): + self.problems = problems + Exception.__init__( + self, + 'The following paths are not disjoint:\n' + ' %s\n' + % ('\n '.join([str(problem) for problem in self.problems]),) + ) + + +def verify_paths_disjoint(*paths): + """Verify that all of the paths in the argument list are disjoint. + + If any of the paths is nested in another one (i.e., in the sense + that 'a/b/c/d' is nested in 'a/b'), or any two paths are identical, + raise a PathsNotDisjointException containing exceptions detailing + the individual problems.""" + + def split(path): + if not path: + return [] + else: + return path.split('/') + + def contains(split_path1, split_path2): + """Return True iff SPLIT_PATH1 contains SPLIT_PATH2.""" + + return ( + len(split_path1) < len(split_path2) + and split_path2[:len(split_path1)] == split_path1 + ) + + paths = [(split(path), path) for path in paths] + # If all overlapping elements are equal, a shorter list is + # considered "less than" a longer one. Therefore if any paths are + # nested, this sort will leave at least one such pair adjacent, in + # the order [nest,nestling]. + paths.sort() + + problems = [] + + # Create exceptions for any repeated paths, and delete the repeats + # from the paths array: + i = 0 + while i < len(paths): + split_path, path = paths[i] + j = i + 1 + while j < len(paths) and split_path == paths[j][0]: + j += 1 + if j - i > 1: + problems.append(PathRepeatedException(path, j - i)) + # Delete all but the first copy: + del paths[i + 1:j] + i += 1 + + # Create exceptions for paths nested in each other: + i = 0 + while i < len(paths): + split_path, path = paths[i] + j = i + 1 + while j < len(paths) and contains(split_path, paths[j][0]): + j += 1 + if j - i > 1: + problems.append(PathsNestedException( + path, [path2 for (split_path2, path2) in paths[i + 1:j]] + )) + i += 1 + + if problems: + raise PathsNotDisjointException(problems) + + +def format_date(date): + """Return an svn-compatible date string for DATE (seconds since epoch). + + A Subversion date looks like '2002-09-29T14:44:59.000000Z'.""" + + return time.strftime("%Y-%m-%dT%H:%M:%S.000000Z", time.gmtime(date)) + + +class CVSTextDecoder: + """Callable that decodes CVS strings into Unicode.""" + + def __init__(self, encodings, fallback_encoding=None): + """Create a CVSTextDecoder instance. + + ENCODINGS is a list containing the names of encodings that are + attempted to be used as source encodings in 'strict' mode. + + FALLBACK_ENCODING, if specified, is the name of an encoding that + should be used as a source encoding in lossy 'replace' mode if all + of ENCODINGS failed. + + Raise LookupError if any of the specified encodings is unknown.""" + + self.decoders = [ + (encoding, codecs.lookup(encoding)[1]) + for encoding in encodings] + + if fallback_encoding is None: + self.fallback_decoder = None + else: + self.fallback_decoder = ( + fallback_encoding, codecs.lookup(fallback_encoding)[1] + ) + + def add_encoding(self, encoding): + """Add an encoding to be tried in 'strict' mode. + + ENCODING is the name of an encoding. If it is unknown, raise a + LookupError.""" + + for (name, decoder) in self.decoders: + if name == encoding: + return + else: + self.decoders.append( (encoding, codecs.lookup(encoding)[1]) ) + + def set_fallback_encoding(self, encoding): + """Set the fallback encoding, to be tried in 'replace' mode. + + ENCODING is the name of an encoding. If it is unknown, raise a + LookupError.""" + + if encoding is None: + self.fallback_decoder = None + else: + self.fallback_decoder = (encoding, codecs.lookup(encoding)[1]) + + def __call__(self, s): + """Try to decode string S using our configured source encodings. + + Return the string as a Unicode string. If S is already a unicode + string, do nothing. + + Raise UnicodeError if the string cannot be decoded using any of + the source encodings and no fallback encoding was specified.""" + + if isinstance(s, unicode): + return s + for (name, decoder) in self.decoders: + try: + return decoder(s)[0] + except ValueError: + Log().verbose("Encoding '%s' failed for string %r" % (name, s)) + + if self.fallback_decoder is not None: + (name, decoder) = self.fallback_decoder + return decoder(s, 'replace')[0] + else: + raise UnicodeError + + +class Timestamper: + """Return monotonic timestamps derived from changeset timestamps.""" + + def __init__(self): + # The last timestamp that has been returned: + self.timestamp = 0.0 + + # The maximum timestamp that is considered reasonable: + self.max_timestamp = time.time() + 24.0 * 60.0 * 60.0 + + def get(self, timestamp, change_expected): + """Return a reasonable timestamp derived from TIMESTAMP. + + Push TIMESTAMP into the future if necessary to ensure that it is + at least one second later than every other timestamp that has been + returned by previous calls to this method. + + If CHANGE_EXPECTED is not True, then log a message if the + timestamp has to be changed.""" + + if timestamp > self.max_timestamp: + # If a timestamp is in the future, it is assumed that it is + # bogus. Shift it backwards in time to prevent it forcing other + # timestamps to be pushed even further in the future. + + # Note that this is not nearly a complete solution to the bogus + # timestamp problem. A timestamp in the future still affects + # the ordering of changesets, and a changeset having such a + # timestamp will not be committed until all changesets with + # earlier timestamps have been committed, even if other + # changesets with even earlier timestamps depend on this one. + self.timestamp = self.timestamp + 1.0 + if not change_expected: + Log().warn( + 'Timestamp "%s" is in the future; changed to "%s".' + % (time.asctime(time.gmtime(timestamp)), + time.asctime(time.gmtime(self.timestamp)),) + ) + elif timestamp < self.timestamp + 1.0: + self.timestamp = self.timestamp + 1.0 + if not change_expected and Log().is_on(Log.VERBOSE): + Log().verbose( + 'Timestamp "%s" adjusted to "%s" to ensure monotonicity.' + % (time.asctime(time.gmtime(timestamp)), + time.asctime(time.gmtime(self.timestamp)),) + ) + else: + self.timestamp = timestamp + + return self.timestamp + + |