#!/usr/bin/python3 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import locale import argparse import os import io import re import shutil from stat import S_ISDIR, S_ISREG import glob import sys import tempfile import warnings from warnings import warn from chardet.universaldetector import UniversalDetector VERSION = "1.8.0" def simple_warning(msg, cat, filename, lineno, file, line): print("\n{}: {}".format(parser.prog, msg), file=file) warnings.showwarning = simple_warning # Adapted from: https://stackoverflow.com/questions/24528278/stream-multiple-files-into-a-readable-object-in-python # Note: the original code is licensed under CC-BY-SA 3.0, which is # upwards-compatible with 4.0, and hence compatible with GPLv3. class ChainStream(io.RawIOBase): """ Chain an iterable of streams together into a single buffered stream. Usage: def generate_open_file_streams(): for file in filenames: yield open(file, 'rb') f = io.BufferedReader(ChainStream(generate_open_file_streams())) f.read() """ def __init__(self, streams): self.leftover = b'' self.stream_iter = iter(streams) try: self.stream = next(self.stream_iter) except StopIteration: self.stream = None def readable(self): return True def _read_next_chunk(self, max_length): # Return 0 or more bytes from the current stream, first returning all # leftover bytes. If the stream is closed returns b'' if self.leftover: return self.leftover elif self.stream is not None: return self.stream.read(max_length) else: return b'' def readinto(self, b): buffer_length = len(b) chunk = self._read_next_chunk(buffer_length) while len(chunk) == 0: # move to next stream if self.stream is not None: self.stream.close() try: self.stream = next(self.stream_iter) chunk = self._read_next_chunk(buffer_length) except StopIteration: # No more streams to chain together self.stream = None return 0 # indicate EOF output, self.leftover = chunk[:buffer_length], chunk[buffer_length:] b[:len(output)] = output return len(output) def slurp(filename): """Read a file into a string, aborting on error.""" try: return open(filename).read() except IOError as e: warn("Could not read file {}".format(filename)) sys.exit(os.EX_DATAERR) def get_files(filenames, verbose): """Yield (filename, permissions) pairs.""" for filename in filenames: try: perms = os.lstat(filename) except OSError as e: warn("Skipping {}: unable to read permissions ({})".format(filename, e)) continue if S_ISDIR(perms.st_mode): if verbose: warn("Skipping directory {}".format(filename)) continue elif S_ISREG(perms.st_mode): yield filename, perms else: warn("Skipping: {} (not a regular file)".format(filename)) def unescape(s): regex = re.compile(r'\\([0-7]{1,3}|x[0-9a-fA-F]{2}|[nrtvafb\\])') return regex.sub(lambda match: eval('"%s"' % match.group()), s) def casetype(string): # Starts with lower case case = 0 # Capitalized? if len(string) >= 1 and string[0].isupper(): case = 1 # All upper case? all_upper = True for i in range(1, len(string)): if not string[i].isupper(): all_upper = False break if all_upper: case = 2 return case def caselike(model, string): if len(string) > 0: case = casetype(model) if case == 1: string = string[0].upper() + string[1:] elif case == 2: string = string.upper() return string def replace(instream, outstream, regex, before, after, encoding, filename): patlen = len(before) sum = 0 tonext = u'' retry_prefix = b'' while True: block = retry_prefix + instream.read(io.DEFAULT_BUFFER_SIZE) if len(block) == 0: break try: err = None block = block.decode(encoding=encoding) retry_prefix = b'' except ValueError as e: # Try carrying invalid input over to next iteration in case it's # just incomplete err = e if e.start > 0: retry_prefix = block[e.start:] try: block = block[:e.start].decode(encoding=encoding) err = None except ValueError as e: err = e finally: if err is not None: if isinstance(err, UnicodeError): warn("{}: decoding error ({})".format(filename, err.reason)) else: warn("decoding error") warn("you can specify the encoding with --encoding") return 0 parts = regex.split(tonext + block) sum += len(parts) // (1 + regex.groups) tonext = parts[-1] or u'' results = [] for i in range(0, len(parts) - regex.groups, 1 + regex.groups): results.append(parts[i]) if parts[i + 1] != '': replace = re.sub(before, after, parts[i + 1]) if args.ignore_case == "match": replace = caselike(parts[i + 1], replace) results.append(replace) joined_parts = ''.join(results) outstream.write(joined_parts.encode(encoding=encoding)) outstream.write(tonext.encode(encoding=encoding)) return sum # Create command line argument parser. parser = argparse.ArgumentParser(description="Search and replace text in files.", formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('--version', action='version', version="%(prog)s " + VERSION + ''' Copyright (C) 2004-2005 Göran Weinholt Copyright (C) 2004 Christian Häggström Copyright (C) 2016 Kevin Coyner Copyright (C) 2017 Jochen Kupperschmidt Copyright (C) 2018-2020 Reuben Thomas %(prog)s comes with ABSOLUTELY NO WARRANTY. You may redistribute copies of %(prog)s under the terms of the GNU General Public License. For more information about these matters, see the file named COPYING.''') parser.add_argument("--encoding", metavar="ENCODING", help="specify character set encoding") parser.add_argument("-i", "--ignore-case", action="store_true", help="search case-insensitively") parser.add_argument("-m", "--match-case", action="store_const", dest="ignore_case", const="match", help="ignore case when searching, but try to match case of replacement to case of original, either capitalized, all upper-case, or mixed") parser.add_argument("-w", "--whole-words", action="store_true", help="whole words (OLD-TEXT matches on word boundaries only)") parser.add_argument("-b", "--backup", action="store_true", help="rename original FILE to FILE~ before replacing") parser.add_argument("-q", "--quiet", action="store_true", help="quiet mode") parser.add_argument("-v", "--verbose", action="store_true", help="verbose mode") parser.add_argument("-s", "--dry-run", action="store_true", help="simulation mode") parser.add_argument("-e", "--escape", action="store_true", help="expand escapes in OLD-TEXT and NEW-TEXT [deprecated]") parser.add_argument("-F", "--fixed-strings", action="store_true", help="treat OLD-TEXT and NEW-TEXT as fixed strings, not regular expressions") parser.add_argument("--files", action="store_true", help="OLD-TEXT and NEW-TEXT are file names to read patterns from") parser.add_argument("--noglob", action="store_true", help="disable globbing and other expansions") parser.add_argument("-p", "--prompt", action="store_true", help="prompt before modifying each file") parser.add_argument("-f", "--force", action="store_true", help="ignore errors when trying to preserve permissions") parser.add_argument("-d", "--keep-times", action="store_true", help="keep the modification times on modified files") parser.add_argument('old_str', metavar='OLD-TEXT') parser.add_argument('new_str', metavar='NEW-TEXT') parser.add_argument('file', metavar='FILE', nargs='+') args = parser.parse_args() files = args.file # Perform globbing and tilde and variable expansion, if desired. if not args.noglob: globbed_files = [] for file in list(map(os.path.expanduser, map(os.path.expandvars, files))): globbed_files += glob.glob(file, recursive=True) files = globbed_files # See if all the files actually exist for file in files: if not os.path.exists(file): warn("File \"{}\" not found".format(file)) sys.exit(os.EX_DATAERR) if args.files: old_str = slurp(args.old_str) new_str = slurp(args.new_str) else: old_str = args.old_str new_str = args.new_str # Tell the user what is going to happen if not args.quiet: warn("{} \"{}\" with \"{}\" ({}; {})".format( "Simulating replacement of" if args.dry_run else "Replacing", old_str, new_str, "ignoring case" if args.ignore_case == True else ("matching case" if args.ignore_case == "match" else "case sensitive"), "whole words only" if args.whole_words else "partial words matched", )) if args.dry_run and not args.quiet: warn("The files listed below would be modified in a replace operation") encoding = None if args.encoding: encoding = args.encoding if args.escape: old_str = unescape(old_str) new_str = unescape(new_str) if args.fixed_strings: old_str = re.escape(old_str) new_str = new_str.replace('\\', r'\\') regex_str = old_str if args.whole_words: regex_str = r"\b" + regex_str + r"\b" # Call re.compile so we get an error if the regex is invalid, & count groups. regex = re.compile("(" + regex_str + ")", re.I if args.ignore_case else 0) total_files = 0 total_matches = 0 for filename, perms in get_files(files, args.verbose): total_files += 1 # If no encoding specified, reset guess for each file if not args.encoding: encoding = None # Open the input file try: f = open(filename, "rb") except IOError as e: warn("Skipping {}: cannot open for reading; error: {}".format(filename, e)) continue # Create the output file try: o, tmp_path = tempfile.mkstemp("", ".tmp.") o = os.fdopen(o, "wb") except OSError as e: warn("Skipping {}: cannot create temp file; error: {}".format(filename, e)) continue # Set permissions and owner try: os.chown(tmp_path, perms.st_uid, perms.st_gid) os.chmod(tmp_path, perms.st_mode) except OSError as e: warn("Unable to set owner/group/perms of {}; error: {}".format(filename, e)) if args.force: warn("New owner/group/perms may not match!\n") else: warn("Skipping {}!\n".format(filename)) os.unlink(tmp_path) continue if args.verbose and not args.dry_run: warn("Processing: {}".format(filename)) # If we don't have an explicit encoding, guess block = b'' if encoding is None: detector = UniversalDetector() scanned_bytes = 0 # Scan at most 1MB, so we don't give up too soon, but don't slurp a # large file. while scanned_bytes < 1024*1024: next_block = f.read(io.DEFAULT_BUFFER_SIZE) if len(next_block) == 0: break scanned_bytes += len(next_block) block += next_block detector.feed(next_block) if detector.done: break f = io.BufferedReader(ChainStream([io.BytesIO(block), f])) detector.close() if detector.done: encoding = detector.result['encoding'] if args.verbose: if encoding is not None: warn("Guessed encoding '{}'".format(encoding)) else: warn("Unable to guess encoding") if encoding is None: encoding = locale.getpreferredencoding(False) if args.verbose: warn("Could not guess encoding; using locale default '{}'".format(encoding)) # Do the actual work now matches = replace(f, o, regex, old_str, new_str, encoding, filename) f.close() o.close() if matches == 0: os.unlink(tmp_path) continue if args.dry_run: try: fn = os.path.realpath(filename) except OSError as e: fn = filename if not args.quiet: print(" {}".format(fn), file=sys.stderr) os.unlink(tmp_path) total_matches += matches continue if args.prompt: print("\nSave \"{}\"? ([Y]/N) ".format(filename), file=sys.stderr, end='') line = "" while line == "" or line[0] not in "Yy\nnN": line = input() if line[0] in "nN": print("Not saved", file=sys.stderr) os.unlink(tmp_path) continue print("Saved", file=sys.stderr) if args.backup: try: shutil.move(filename, filename + "~") except OSError as e: warn("Error renaming {} to {}:".format(filename, filename + "~", e)) continue # Rename the file try: shutil.move(tmp_path, filename) except OSError as e: warn("Could not replace {} with {}; error: {}".format(tmp_path, filename, e)) os.unlink(tmp_path) continue # Restore the times if args.keep_times: try: os.utime(filename, (perms.st_atime, perms.st_mtime)) except OSError as e: warn("Error setting timestamps of {}: {}".format(filename, e)) total_matches += matches # We're about to exit, give a summary if not args.quiet: warn("{} matches {} in {} file{}".format( total_matches, "found" if args.dry_run else "replaced", total_files, "s" if total_files != 1 else "", ))