All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SerialSubstitution.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Run with `--help` for usage instructions
4 #
5 # Changes:
6 # 20200714 (petrillo@slac.stanford.edu) [v2.0]
7 # updated to Python 3
8 #
9 
10 __doc__ = "Performs hard-coded substitutions on all files in a directory."
11 __version__ = '2.0'
12 
13 import sys, os
14 import logging
15 import re
16 import shutil
17 import tempfile
18 
19 def ANSIcode(content): return "\x1B[" + content + "m"
20 
21 ANSIReset = ANSIcode("0")
22 ANSIRed = ANSIcode("31")
23 ANSIGreen = ANSIcode("32")
24 ANSIBlue = ANSIcode("34")
25 ANSIBrightBlue = ANSIcode("1;34")
26 ANSIMagenta = ANSIcode("35")
27 ANSIYellow = ANSIcode("1;33")
28 ANSIWhite = ANSIcode("1;37")
29 
30 ################################################################################
31 ### Library code
32 ###
33 def Colorize(msg, category, options):
34  if not options or not options.UseColor: return str(msg)
35  return options.Colors.get(category, "") + str(msg) + ANSIReset
36 # Colorize()
37 
38 
40 
41  def __init__(self): self.options = None
42 
43  def SetOptions(self, options): self.options = options
44 
45  def Location(self): return "(no context)"
46  def __str__(self): return self.Location()
47 
48 # class ContextClass
49 
51  def __init__(self, filename, line_no):
52  ContextClass.__init__(self)
53  self.filename = filename
54  self.SetLine(line_no)
55  # __init__()
56 
57  def Location(self):
58  return "%s@%s" % (
59  Colorize(self.filename, 'source', self.options),
60  Colorize(self.line_no, 'line_no', self.options)
61  )
62 
63  def SetLine(self, line_no): self.line_no = line_no
64 
65 # class LineNoContextClass
66 
67 
69 
70  def __init__(self): self.options = None
71 
72  def SetOptions(self, options): self.options = options
73 
74  def __str__(self): return "<no substitution>"
75 
76  def __call__(self, s, context = None): return s
77 
78  def describe(self): return str(self)
79 
80 # class SubstitutionClass
81 
82 
84  def __init__(self, match, replacement, exceptions = []):
85  SubstitutionClass.__init__(self)
86  self.regex = re.compile(match)
87  self.repl = replacement
88  self.exceptions = list(map(re.compile, exceptions))
89  # __init__()
90 
91  def __str__(self): return self.regex.pattern
92 
93  def __call__(self, s, context = None):
94  for pattern in self.exceptions:
95  if pattern.search(s) is not None: return s
96  return self.regex.sub(self.repl, s)
97 
98  def Describe(self):
99  return "%r -> %r (regex)" % (self.regex.pattern, self.repl)
100 
101 # class RegExSubstitutionClass
102 
103 
105  def __init__(self, match, exceptions = []):
106  SubstitutionClass.__init__(self)
107  self.regex = re.compile(match)
108  self.exceptions = list(map(re.compile, exceptions))
109  # __init__()
110 
111  def __str__(self): return self.regex.pattern
112 
113  def __call__(self, s, context = None):
114  for pattern in self.exceptions:
115  if pattern.search(s) is not None: return s
116  if self.regex.match(s): return []
117  return s
118  # __call__
119 
120  def Describe(self):
121  return "%r (remove)" % (self.regex.pattern, )
122 
123 # class RegExDeleteLineClass
124 
125 
127  def __init__(self, match, replacement, exceptions = []):
128  SubstitutionClass.__init__(self)
129  self.pattern = match
130  self.repl = replacement
131  self.exceptions = exceptions
132  # __init__()
133 
134  def __str__(self): return self.pattern
135 
136  def __call__(self, s, context = None):
137  for pattern in self.exceptions:
138  if pattern in s: return s
139  return s.replace(self.pattern, self.repl)
140  # __call__()
141 
142  def Describe(self):
143  return "%r -> %r (literal)" % (self.pattern, self.repl)
144 
145 # class ReplacementClass
146 
147 
149  def __init__(self, match, message, exceptions = []):
150  SubstitutionClass.__init__(self)
151  if hasattr(match, 'search'):
152  self.pattern = match.pattern
153  self.regex = match
154  else:
155  self.pattern = match
156  self.regex = re.compile(match)
157  self.msg = message
158  self.exceptions = list(map(re.compile, exceptions))
159  # __init__()
160 
161  def __str__(self): return self.pattern
162 
163  def __call__(self, s, context = None):
164  for pattern in self.exceptions:
165  if pattern.search(s) is not None: return s
166  match = self.regex.search(s)
167  if match is not None:
168  msg = match.expand(self.msg)
169  if context is None:
170  logging.warning(
171  "From line '%s': %s", s,
172  Colorize(msg, 'warning', self.options)
173  )
174  else:
175  logging.warning(
176  "From %s: %s\n => %s",
177  context.Location(), Colorize(msg, 'warning', self.options), s
178  )
179  # if ... else
180  logging.debug("(pattern: %r on %r)", self.regex.pattern, s)
181  # if
182  return s
183  # __call__()
184 
185  def Describe(self):
186  return "%r -> %r (warning)" % (self.pattern, self.msg)
187 
188 # class WarningClass
189 
190 
191 
192 
194  def __init__(self, name):
195  """Supported keyword arguments: "options"
196  """
197  self.name = name
198  self.options = None
199  self.file_filters = []
200  self.patterns = []
201  # __init__()
202 
203  def SetOptions(self, options):
204  self.options = options
205  for pattern in self.patterns: pattern.SetOptions(options)
206  return self
207  # SetOptions()
208 
209  def SetColors(self, **colors):
210  try: self.options.Colors.update(colors)
211  except AttributeError: self.options.Colors = colors
212  # SetColors()
213 
214  def Color(self, msg, category): return Colorize(msg, category, self.options)
215 
216  def RecordPattern(self, pattern):
217  pattern.SetOptions(self.options)
218  self.patterns.append(pattern)
219  # RecordPattern()
220 
221 
222  def AddFilePattern(self, pattern):
223  if not pattern.endswith('$'): pattern += "$"
224  match = re.compile(pattern)
225  self.file_filters.append(match)
226  return self
227  # AddFilePattern()
228 
229  def AddFileNamePattern(self, name_pattern):
230  return self.AddFilePattern(R"(.*/)*" + name_pattern)
231 
232  def AddFileType(self, *suffixes):
233  for suffix in suffixes: self.AddFileNamePattern(".*\." + suffix)
234  return self
235  # AddFileType()
236 
237 
238  def AddRegExPattern(self, pattern, repl, exceptions = []):
239  self.RecordPattern(RegExSubstitutionClass(pattern, repl, exceptions))
240  return self
241  # AddRegExPattern()
242 
243  def AddRegExRemoveLine(self, pattern, exceptions = []):
244  self.RecordPattern(RegExDeleteLineClass(pattern, exceptions))
245  return self
246  # AddRegExRemoveLine()
247 
248  def AddSimplePattern(self, pattern, repl, exceptions = []):
249  self.RecordPattern(ReplacementClass(pattern, repl, exceptions))
250  return self
251  # AddSimplePattern()
252 
253  def AddWord(self, word, repl, exceptions = []):
254  return self.AddRegExPattern(r'\b' + word + r'\b', repl, exceptions)
255 
256  def AddWarningPattern(self, pattern, msg, exceptions = []):
257  self.RecordPattern(WarningClass(pattern, msg, exceptions))
258  return self
259  # AddWarningPattern()
260 
261  def AddPattern(self, pattern, repl, exceptions=[]):
262  return self.AddRegExPattern(pattern, repl, exceptions)
263 
264 
265  def MatchFile(self, FilePath):
266  if not self.file_filters: return True
267  for pattern in self.file_filters:
268  if pattern.match(FilePath) is None: continue
269  # logging.debug("Matched pattern: '%s'", pattern.pattern)
270  return True
271  # for
272  return False
273  # MatchFile()
274 
275  def SubstituteLine(self, line, context = None):
276  """Returns the very same string if the new line is the same as the old one
277  or a list of lines to replace line with
278  """
279  if line is None: return line
280 
281  for subst in self.patterns:
282  new_line = subst(line, context)
283  if new_line is line: continue
284 
285  msg = " pattern '%s' matched" % subst
286  if context is not None: msg += " at %s" % context
287  msg += ":"
288  if isinstance(new_line, str):
289  msg += "\n OLD| " + self.Color(line.rstrip('\n'), 'old')
290  msg += "\n NEW| %s" % self.Color(new_line.rstrip('\n'), 'new')
291  elif not new_line:
292  msg += "\n DEL| %s" % self.Color(line.rstrip('\n'), 'old')
293  else:
294  msg += "\n OLD| " + self.Color(line.rstrip('\n'), 'old')
295  for l in new_line:
296  msg += "\n NEW| %s" % self.Color(l.rstrip('\n'), 'new')
297  # if ... else
298  self.options.LogMsg(msg)
299 
300  # if the result if not a single line, we interrupt here;
301  # no particular reason, but we don't need a more complex behaviour
302  if not isinstance(new_line, str): return new_line
303 
304  line = new_line
305  # for
306  return line
307  # SubstituteLine()
308 
309  def ProcessFile(self, FilePath):
310  """Returns whether substitutions were performed"""
311  if not self.patterns: return False
312 
313  # logging.debug("Considering file: '%s'", FilePath)
314 
315  # filter by file name/path
316  if not self.MatchFile(FilePath): return False
317 
318  # replace in memory, line by line
319  context = LineNoContextClass(FilePath, 0)
320  context.SetOptions(self.options)
321  Content = []
322  nChanges = 0
323  SourceFile = open(FilePath, 'r')
324  for iLine, line in enumerate(SourceFile):
325  context.SetLine(iLine + 1)
326  new_line = self.SubstituteLine(line, context)
327  if new_line is line:
328  Content.append(line)
329  continue
330  # if no change
331  if isinstance(new_line, str):
332  Content.append(new_line)
333  elif new_line: # expects a list or None
334  Content.extend(new_line)
335  # if .. else
336  nChanges += 1
337  # for
338  SourceFile.close()
339  # if substitutions have been not performed, return
340  if nChanges == 0:
341  logging.debug("No changes in '%s'.", FilePath)
342  return False
343  logging.debug("%d lines changed in '%s'.", nChanges, FilePath)
344 
345  if self.options.DoIt:
346  # create the new file
347  OutputFile = ProcessorClass.CreateTempFile(FilePath)
348  OutputPath = OutputFile.name
349  # logging.debug(" (temporary file: '%s')", OutputPath)
350  OutputFile.write("".join(Content))
351  OutputFile.close()
352  shutil.copymode(FilePath, OutputPath)
353 
354  # if we are still alive, move the new file in place of the old one
355  shutil.move(OutputPath, FilePath)
356  # if
357 
358  return True
359  # ProcessFile()
360 
361  def ProcessFiles(self, *files):
362  nChanged = 0
363  for FilePath in files:
364  if self.ProcessFile(FilePath): nChanged += 1
365  # for files
366  return nChanged
367  # ProcessFiles()
368 
369  def ProcessDir(self, DirPath):
370  """Returns the number of files processor actually acted on"""
371  nActions = 0
372  if os.path.isdir(DirPath):
373  for dirpath, dirnames, filenames in os.walk(DirPath):
374  filepaths \
375  = [ os.path.join(dirpath, filename) for filename in filenames ]
376  nChanged = self.ProcessFiles(*filepaths)
377  if nChanged > 0:
378  logging.debug(" processor '%s' changed %d files in '%s'",
379  self.name, nChanged, dirpath
380  )
381  # if
382  nActions += nChanged
383  # for
384  if nActions > 0:
385  ApplyChangesMsg = "changed" if self.options.DoIt else "would change"
386  logging.info("Processor '%s' %s %d files in '%s'",
387  self.name, ApplyChangesMsg, nActions, DirPath
388  )
389  # if nActions
390  else:
391  if self.ProcessFile(DirPath):
392  ApplyChangesMsg = "changed" if self.options.DoIt else "would change"
393  logging.info("Processor '%s' %s file '%s'",
394  self.name, ApplyChangesMsg, DirPath
395  )
396  nActions += 1
397  # if
398  # if ... else
399  return nActions
400  # ProcessDir()
401 
402  def __str__(self): return self.name
403 
404  def Describe(self):
405  output = [
406  "Processor '%s' applies %d substitutions" % (self, len(self.patterns))
407  ]
408  for subst in self.patterns:
409  try: output.append(" " + subst.Describe())
410  except AttributeError:
411  output.append(" " + str(subst))
412  except:
413  output.append(" " + repr(subst))
414  # for
415  return output
416  # Describe()
417 
418  @staticmethod
419  def CreateTempFile(FilePath):
420  TempPath = os.path.join(
421  tempfile.gettempdir(),
422  tempfile.gettempprefix() + "-" + os.path.basename(FilePath) + ".tmp"
423  )
424  TempFile = open(TempPath, 'w')
425  return TempFile
426  # CreateTempFile()
427 
428 # class ProcessorClass
429 
430 
431 
433  def __init__(self):
434  self.options = None
435  self.processors = []
436  # __init__()
437 
438  def __iter__(self): return iter(self.processors)
439  def __len__(self): return len(self.processors)
440 
441  def SetOptions(self, options):
442  self.options = options
443  for processor in self: processor.SetOptions(options)
444 
445  def SetColors(self, **colors):
446  for processor in self: processor.SetColors(**colors)
447 
448  def SelectProcessors(self, ProcessorNames):
449  if ProcessorNames is None: return
450  selected = []
451  for ProcessorName in ProcessorNames:
452  for Processor in self.processors:
453  if Processor.name != ProcessorName: continue
454  selected.append(Processor)
455  break
456  else:
457  raise RuntimeError \
458  ("Unknown processor '%s' selected" % ProcessorName)
459  # for ... else
460  # for processor names
461  self.processors = selected
462  # SelectedProcessors()
463 
464  def ProcessDir(self, DirPath):
465  ApplyChangesMsg = "changed" if self.options.DoIt else "would be changed"
466  nChanged = 0
467  for processor in self: nChanged += processor.ProcessDir(DirPath)
468  logging.info("%d file %s under '%s'", nChanged, ApplyChangesMsg, DirPath)
469  return nChanged
470  # ProcessDir()
471 
472  def AddProcessor(self, processor):
473  self.processors.append(processor)
474  return processor
475  # AddProcessor()
476 
477  def Describe(self):
478  output = [ "There are %d processors in queue" % len(self) ]
479  for processor in self:
480  output.extend(processor.Describe())
481  return output
482  # Describe()
483 
484 
485 # class ProcessorsList
486 ProcessorsList.Global = ProcessorsList()
487 
488 
489 def AddProcessor(processor):
490  return ProcessorsList.Global.AddProcessor(processor)
491 
492 
493 def LoggingSetup(LoggingLevel = logging.INFO):
494 
495  logging.basicConfig(
496  level=LoggingLevel,
497  format="%(levelname)s: %(message)s"
498  )
499 
500 # def LoggingSetup()
501 
502 ################################################################################
504  import argparse
505 
506  parser = argparse.ArgumentParser(description=__doc__)
507 
508  parser.add_argument("InputDirs", nargs="*", action="store",
509  help="input directories [current]")
510 
511  parser.add_argument('--doit', dest="DoIt", action='store_true',
512  help="perform the substitutions [%(default)s]")
513 
514  parser.add_argument('--verbose', '-v', dest="DoVerbose", action='store_true',
515  help="shows all the changes on screen [%(default)s]")
516  parser.add_argument('--debug', dest="DoDebug", action='store_true',
517  help="enables debug messages on screen")
518  parser.add_argument('--color', '-U', dest="UseColor", action='store_true',
519  help="enables coloured output [%(default)s]")
520 
521  parser.add_argument('--list', dest="DoList", action='store_true',
522  help="just prints the hard-coded substitutions for each processor")
523  parser.add_argument('--only', dest="SelectedProcessors", action='append',
524  help="executes only the processors with the specified name (see --list)")
525  parser.add_argument('--version', action='version',
526  version='%(prog)s ' + __version__)
527 
528  arguments = parser.parse_args()
529 
530  # set up the logging system
531  LoggingSetup(logging.DEBUG if arguments.DoDebug else logging.INFO)
532 
533  if arguments.DoVerbose: arguments.LogMsg = logging.info
534  else: arguments.LogMsg = logging.debug
535 
536  Processors = ProcessorsList.Global # use the global list
537 
538  Processors.SetOptions(arguments)
539  Processors.SetColors(
540  old=ANSIRed, new=ANSIGreen, source=ANSIWhite, line_no=ANSIMagenta,
541  warning=ANSIYellow
542  )
543  if arguments.SelectedProcessors:
544  Processors.SelectProcessors(arguments.SelectedProcessors)
545 
546  if arguments.DoList:
547  logging.info("\n".join(Processors.Describe()))
548  sys.exit(0)
549  # if
550 
551 
552  if not arguments.InputDirs: arguments.InputDirs = [ '.' ]
553 
554  for InputPath in arguments.InputDirs:
555  Processors.ProcessDir(InputPath)
556 
557  return 0
558 # RunSubstitutor()
559 
560 
561 ################################################################################
562 if __name__ == "__main__":
563 
564  #############################################################################
565  # Test
566  #
567  subst = AddProcessor(ProcessorClass("subst"))
568 
569  subst.AddPattern (r"[^\w]", r"_" )
570  subst.AddSimplePattern("A", "a")
571 
572  sys.exit(RunSubstitutor())
573 # main
def Colorize
Library code.
auto enumerate(Iterables &&...iterables)
Range-for loop helper tracking the number of iteration.
Definition: enumerate.h:69
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep.
list
Definition: file_to_url.sh:28
open(RACETRACK) or die("Could not open file $RACETRACK for writing")