All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sortDataLoggerFiles.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Documentation can be rendered in python interactive session with
4 # `import sortDataLoggerFiles; help(sortDataLoggerFiles)` and on command line
5 # with `sortDataLoggerFiles --help`.
6 #
7 # It may require ROOT.
8 #
9 # Changes:
10 # 20210411 (petrillo@slac.fnal.gov) [1.0]
11 # first public version
12 # 20210518 (petrillo@slac.fnal.gov) [1.1]
13 # added options for duplicate events
14 # 20210602 (petrillo@slac.fnal.gov) [1.2]
15 # added optional stream name to the file name pattern;
16 # fixed a bug where first logger option value would be ignored
17 # 20220222 (petrillo@slac.fnal.gov) [1.3]
18 # added support for a new file name format, and for multiple formats
19 #
20 
21 import sys, os
22 import re
23 import logging
24 
25 __doc__ = """Sorts a list of data logger output files.
26 
27 File paths are read from all the specified file lists in sequence, or from
28 standard input if no file list is specified.
29 
30 If a line is encountered that does not match the typical file name pattern,
31 that line is ignored and a warning is printed.
32 
33 Comments and empty lines at the beginning of the first file list are printed
34 at the top of the output as they are. All other comments and empty lines are
35 printed at the end of the output.
36 
37 Note that it is possible to sort "in place" by specifying the same file list as
38 input and output.
39 
40 Duplicate files are files on the same run, data logger cycle and data logger
41 number. By default, only the first of the duplicate files is written into the
42 output list, and only the number of duplicates is printed. Options allow to
43 write a detailed list of duplicate files on screen and on disk, or not to check
44 for duplication altogether.
45 
46 """
47 
48 __author__ = 'Gianluca Petrillo (petrillo@slac.stanford.edu)'
49 __date__ = 'February 22, 2022'
50 __version__ = '1.3'
51 
52 
54  """Provides less() to compare numbers with a single offset cycle.
55 
56  For example, with offset 3 the order of [0:20] would be, from the lowest:
57  [ 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, ]
58  """
59  def __init__(self, first): self.first = first
60  def less(self, a, b): return (a < b) == ((a < self.first) == (b < self.first))
61 # class CycleCompareClass
62 
63 
65  """Static object (namespace?) containing file name parsing utilities.
66 
67  All supported file name patterns are in the `Patterns` class variable.
68  The static method `match()` tries all of them, in order.
69  """
70  Patterns = [
71  {
72  'Name': 'general',
73  # pattern parameters: data logger stream stream name run pass filler (timestamp)
74  # <1> <2> <3> <4> <5> <6>
75  'Pattern': re.compile(r"data_dl(\d+)(_fstrm([^_]*))?_run(\d+)_(\d+)_(.*)\.root"),
76  'Parameters': {
77  'DataLogger': ( 1, int ),
78  'StreamName': ( 3, str ),
79  'RunNumber' : ( 4, int ),
80  'PassCount' : ( 5, int ),
81  },
82  }, # general
83  {
84  'Name': 'multistream',
85  # pattern parameters: stream name data logger run pass filler (timestamp)
86  # <1> <2> <3> <4> <5>
87  'Pattern': re.compile(r"([^_]+)_data_dl(\d+)_run(\d+)_(\d+)_(.*)\.root"),
88  'Parameters': {
89  'DataLogger': ( 2, int ),
90  'StreamName': ( 1, str ),
91  'RunNumber' : ( 3, int ),
92  'PassCount' : ( 4, int ),
93  },
94  }, # general
95  ] # Patterns
96 
98  def __init__(self, name, fields):
99  self.name = name
100  self.fields = fields
101  # __init__()
102  def __nonzero__(self): return len(self.fields) > 0
103  def get(self, *fieldNames):
104  return tuple(self.fields.get(fieldName, None) for fieldName in fieldNames)
105  # class ParsedNameClass
106 
107  def __init__(self): pass
108 
109  @staticmethod
110  def match(name):
111  # the first successful pattern is used
112  for patternInfo in FileNameParser.Patterns:
113  match = patternInfo['Pattern'].match(name)
114  if match is None: continue
115  d = dict(
116  ( name, ( type_(value) if (value := match.group(index)) else None ) )
117  for name, ( index, type_ ) in patternInfo['Parameters'].items()
118  )
119  d['Name'] = patternInfo['Name']
120  return FileNameParser.ParsedNameClass(name, d)
121  else: return FileNameParser.ParsedNameClass(name, {})
122  # match()
123 # class FileNameParser
124 
125 
127  """This class collects information about a input file, including a sorting
128  criterium.
129  """
130 
131  XRootDprotocolHead = 'root://fndca1.fnal.gov:1094/'
132  XRootDprotocolDir = 'pnfs/fnal.gov/usr'
133  POSIXprotocolHead = '/'
134  POSIXprotocolDir = 'pnfs'
135 
136  POSIXPattern = re.compile(
137  POSIXprotocolHead.replace('.', r'\.')
138  + POSIXprotocolDir.replace('.', r'\.')
139  + r"/([^/]+)/(.*)")
140  XRootDPattern = re.compile(
141  XRootDprotocolHead.replace('.', r'\.')
142  + XRootDprotocolDir.replace('.', r'\.')
143  + r"/(.*)"
144  )
145  _DataLoggerSorter = CycleCompareClass(first=4)
146 
147  @staticmethod
148  def getFirstDataLogger(index): return FileInfoClass._DataLoggerSorter.first
149  @staticmethod
150  def setFirstDataLogger(index):
151  FileInfoClass._DataLoggerSorter = CycleCompareClass(first=index)
152 
153  def __init__(self,
154  line: "input file line (should include endline)",
155  source: "an arbitrary identifier to track the origin of the line" = None,
156  ):
157  """Constructor: use and parse the specified input file line."""
158  self.line = line
159  self.source = source
160  self.path = line.strip()
161  self.protocolAndDir, self.name = os.path.split(self.path)
162  parsedName = FileNameParser.match(self.name)
163  self.is_file = bool(parsedName)
164  if self.is_file:
165  self.dataLogger, self.run, self.stream, self.pass_ \
166  = parsedName.get('DataLogger', 'RunNumber', 'StreamName', 'PassCount')
167  # __init__()
168 
169  def __lt__(self, other):
170  """Comparison: run, then pass, then (offset cycled) data logger number."""
171  if not self.is_file:
172  raise RuntimeError \
173  ("Sorting not supported for non-file objects ('%s')" % self.path)
174  # if
175  if self.run < other.run: return True
176  if self.run > other.run: return False
177 
178  if self.pass_ < other.pass_: return True
179  if self.pass_ > other.pass_: return False
180 
181  if self.dataLogger != other.dataLogger:
182  return \
183  FileInfoClass._DataLoggerSorter.less(self.dataLogger, other.dataLogger)
184 
185  assert (self.stream is None) == (other.stream is None)
186  return False if self.stream is None else self.stream < other.stream
187 
188  # __lt__()
189 
190  def __str__(self):
191  s = f"Run {self.run} cycle {self.pass_} data logger {self.dataLogger}"
192  if self.stream: s += f" stream {self.stream}"
193  return s
194 
195  def pathToXRootD(self) -> "stored file path in XRootD format":
196  if not self.is_file:
197  raise RuntimeError(
198  "XRootD conversion not supported for non-file objects ('%s')" % self.path
199  )
200  # if not file
201  match = FileInfoClass.POSIXPattern.match(self.path)
202  return os.path.join(
203  FileInfoClass.XRootDprotocolHead, FileInfoClass.XRootDprotocolDir,
204  *match.group(1, 2)
205  ) if match else self.path
206 
207  # pathToXRootD()
208 
209  def pathToPOSIX(self) -> "stored file path in POSIX (PNFS local) format":
210  if not self.is_file:
211  raise RuntimeError(
212  "XRootD conversion not supported for non-file objects ('%s')" % self.path
213  )
214  # if not file
215  match = FileInfoClass.XRootDPattern.match(self.path)
216  return os.path.join(
217  FileInfoClass.POSIXprotocolHead, FileInfoClass.POSIXprotocolDir,
218  match.group(1)
219  ) if match else self.path
220 
221  # pathToXRootD()
222 
223 # class FileInfoClass
224 
225 
227  def add(self, data, key = None):
228  if key is None: key = data
229  try:
230  if key >= self.minKey: return False
231  except AttributeError: pass # no self.minKey yet?
232  self.minKey = key
233  self.minData = data
234  return True
235  # add()
236  def min(self): return self.minData
237 # class MinimumAccumulator
238 
239 
240 def findFirstCycle(files, stream):
241  firstLogger = None
242  firstPassFiles = []
243  wrapped = False
244  for info in files:
245  if info.stream != stream: continue
246  if firstLogger == info.dataLogger: break # cycle completed
247  if wrapped and info.dataLogger > firstLogger: break # cycle completed
248 
249  if firstLogger is None: firstLogger = info.dataLogger
250  elif not wrapped and info.dataLogger < firstLogger: wrapped = True
251 
252  firstPassFiles.append(info)
253  logging.debug("Added cycle %d logger %d stream %s to first cycle list",
254  info.pass_, info.dataLogger, info.stream)
255  # for
256  return firstPassFiles
257 # findFirstCycle()
258 
259 
260 def extractFirstEvent(filePath):
261  try: import ROOT
262  except ImportError:
263  raise RuntimeError("""ROOT python module could not be loaded.
264  In this condition, you'll have to skip the autodetection of the first logger
265  by explicitly specifying its number as option to the script."""
266  )
267  # try ... except
268  logging.debug("Opening '%s' for event number check...", filePath)
269  srcFile = ROOT.TFile.Open(filePath, "READ")
270  if not srcFile:
271  raise RuntimeError \
272  ("Failed to open '%s' for event number extraction." % filePath)
273  #
274  try: firstEvent = next(iter(srcFile.Events)) # go PyROOT
275  except StopIteration:
276  logging.debug("File '%s' appears to contain no events.", filePath)
277  return None
278  firstEventNumber = firstEvent.EventAuxiliary.event() # keep going PyROOT
279 
280  logging.debug("First event from '%s': %d", filePath, firstEventNumber)
281  return firstEventNumber
282 # extractFirstEvent()
283 
284 
285 def detectFirstLogger(fileInfo):
286  # in the end, we don't need a stream-aware algorithm to determine which
287  # data logger received the first event, as long as we have all relevant
288  # streams represented
289  lowestEvent = MinimumAccumulator()
290  for stream, files in fileInfo.items():
291  if not len(files): continue
292  for info in files:
293  firstEvent = extractFirstEvent(info.pathToXRootD())
294  if firstEvent is not None:
295  lowestEvent.add(info, key=firstEvent)
296  if firstEvent == 1: break # can't get lower than this!
297  # for files
298  # for
299  try: firstLogger = lowestEvent.min().dataLogger
300  except AttributeError:
301  # this is in general a problem because it implies that we are failing to
302  # correctly parse the list of input files
303  raise RuntimeError("No data found for the first data logger pass.")
304  logging.debug("Detected first logger: %d", firstLogger)
305  return firstLogger
306 # detectFirstLogger()
307 
308 
309 def buildFileIndex(
310  fileInfo: "list with information from all files",
311  ) -> "a dictionary: { key -> list of files }":
312 
313  fileKey = lambda info: ( info.run, info.pass_, info.dataLogger, info.stream, )
314  index = {}
315  for info in fileInfo:
316  index.setdefault(fileKey(info), []).append(info)
317  return index
318 # buildFileIndex()
319 
320 
321 if __name__ == "__main__":
322 
323  logging.basicConfig(level=logging.INFO)
324 
325  import argparse
326 
327  parser = argparse.ArgumentParser(description=__doc__)
328  parser.set_defaults(skipDuplicates=True)
329 
330  parser.add_argument('inputFiles', nargs="*", metavar='inputFileNames',
331  help='input file lists [one from stdin by default]')
332  parser.add_argument('--firstlogger', type=int,
333  help='index of the first data logger in the cycle')
334  parser.add_argument('--output', '-o', default=None,
335  help=
336  'name of the file to write the resulting list into (overwritten!) [stdout]'
337  )
338  parser.add_argument('--nooutput', action="store_true",
339  help='do not print on screen nor write to file the files in input')
340 
341  duplGroup = parser.add_argument_group(title="duplicate file options")
342  duplGroup.add_argument('--printduplicates', '-d', action="store_true",
343  help='print duplicate files on screen')
344  duplGroup.add_argument('--skipduplicates', '-S', dest='skipDuplicates',
345  action="store_true",
346  help='do not include duplicate files in the list (default)'
347  )
348  duplGroup.add_argument('--keepduplicates', '-K', dest='skipDuplicates',
349  action="store_false",
350  help='include also duplicate files in the list (default)'
351  )
352  duplGroup.add_argument('--duplicatelist', '-D', type=str, default=None,
353  help='name of a file list to be created with duplicate entries')
354 
355  parser.add_argument('--xrootd', '--root', '-X', action="store_true",
356  help='convert the paths to XRootD URL')
357  parser.add_argument('--posix', '-P', action="store_true",
358  help='convert the paths to local POSIX path')
359  parser.add_argument('--debug', action="store_true",
360  help='prints out debugging messages')
361  parser.add_argument \
362  ('--version', '-V', action='version', version='%(prog)s ' + __version__)
363 
364  args = parser.parse_args()
365 
366  if args.debug: logging.getLogger().setLevel(logging.DEBUG)
367 
368  if args.xrootd and args.posix:
369  raise RuntimeError("XRootD and POSIX output format options are exclusive.")
370 
371  printDuplicates = args.printduplicates
372  skipDuplicates = args.skipDuplicates
373  makeDuplicateList = args.duplicatelist
374 
375  # "sources" are given directly as input (None = sys.stdin)
376  sources = args.inputFiles if args.inputFiles else [ "<stdin>" ]
377 
378  # "inputFiles" are all the files found in the sources
379  inputFiles = (
380  [ file_ ] if file_.endswith('.root') else open(file_, 'r')
381  for file_ in args.inputFiles
382  ) if args.inputFiles else [ sys.stdin, ]
383 
384  # example: /pnfs/icarus/persistent/users/ascarpel/trigger/4989/decoded/17247391_0/data_dl2_run4989_1_20210219T015125_20210219T200434-decode.root
385 
386  preComments = []
387  postComments = []
388  fileInfo = []
389  sourceNames = []
390  for iSource, file_ in enumerate(inputFiles):
391  isSingleFile = isinstance(file_, list) and len(file_) <= 1
392  for iLine, line in enumerate(file_):
393  info = FileInfoClass(line, source=( iSource, None if isSingleFile else iLine + 1 ))
394  if not info.is_file:
395  if not info.path or info.path.startswith('#'):
396  (postComments if fileInfo else preComments).append(info.line)
397  continue
398  else:
399  logging.warning \
400  ("Line %d ('%s') does not match file pattern." % (iLine, info.path))
401  continue
402  # if not file
403  fileInfo.append(info)
404  # for line in file
405  # for input files
406 
407  Streams = list(set( info.stream for info in fileInfo ))
408  logging.debug("%d data files in %d streams: %s",
409  len(fileInfo), len(Streams),
410  ", ".join(stream if stream else "<none>" for stream in Streams)
411  )
412 
413  if fileInfo and (args.firstlogger is None):
414  # uses internal FileInfoClass ordering (firstLogger not set: any will do)
415  fileInfo.sort()
416  firstPassFiles = dict( ( stream, findFirstCycle(fileInfo, stream) )
417  for stream in Streams )
418  assert firstPassFiles
419  firstLogger = detectFirstLogger(firstPassFiles)
420  else: firstLogger = args.firstlogger if args.firstlogger is not None else 4
421 
422  FileInfoClass.setFirstDataLogger(firstLogger)
423 
424  fileInfo.sort() # uses internal FileInfoClass ordering
425 
426  #
427  # deal with duplicates
428  #
429  if printDuplicates or makeDuplicateList or skipDuplicates:
430  nDuplicates = 0
431  fileIndex = buildFileIndex(fileInfo)
432  uniqueFiles = [] if skipDuplicates else None
433  duplicateFiles = [] if makeDuplicateList else None
434  # we rely on insertion-ordered dictionary guarantee of Python 3.7
435  for fileList in fileIndex.values():
436  mainInfo = fileList[0]
437  if uniqueFiles is not None: uniqueFiles.append(mainInfo)
438  if len(fileList) > 1:
439  nDuplicates += len(fileList) - 1
440  if duplicateFiles is not None: duplicateFiles.extend(fileList[1:])
441  if printDuplicates:
442  firstSource = mainInfo.source[0]
443  msg += f"{mainInfo} with {len(fileList) - 1} duplicates of"
444 
445  if len(sources) > 1: msg += f" {sources[mainInfo.source[0]]}"
446  if mainInfo.source[1] is not None: msg += f" line {mainInfo.source[1]}"
447  msg += ":"
448  for info in fileList[1:]:
449  if info.source[0] != firstSource: msg += f"{sources[info.source[0]]}"
450  if info.source[1] is not None: msg += f" line {info.source[1]}"
451  msg += ";"
452  # for
453  logging.info(msg)
454  # if print duplicates
455  # if duplicates
456  # for
457  if nDuplicates: logging.info(f"Found {nDuplicates} duplicate files.")
458  if duplicateFiles:
459  with open(makeDuplicateList, 'w') as DuplicateListFile:
460  for info in duplicateFiles: # lines still have their <CR>
461  print(info.line, file=DuplicateListFile, end='')
462  logging.info(f"{nDuplicates} duplicate file names written in '{makeDuplicateList}'.")
463  # if we have duplicates and we write them
464  # if print or store duplicates
465 
466  fileListContent = uniqueFiles if skipDuplicates else fileInfo
467 
468 
469  #
470  # print everything
471  #
472 
473  # NOTE: keep this after all the input has been read,
474  # so that input files can be safely overwritten
475  if not args.nooutput:
476  outputFile = open(args.output, 'w') if args.output else sys.stdout
477 
478  # <CR> were not removed from `line`
479  for line in preComments: outputFile.write(line)
480  for info in fileListContent:
481  if args.posix: line = info.pathToPOSIX() + '\n'
482  elif args.xrootd: line = info.pathToXRootD() + '\n'
483  else: line = info.line
484  outputFile.write(line)
485  for line in postComments: outputFile.write(line)
486 
487  if outputFile is not sys.stdout:
488  logging.info \
489  (f"{len(fileListContent)} file entries written into '{outputFile.name}'.")
490  del outputFile
491  # if
492  else:
493  logging.info(f"Found {len(fileListContent)} file entries.")
494  # if ... else
495 
496  sys.exit(0)
497 
498 # main
do one_file $F done echo for F in find $TOP name CMakeLists txt print
static constexpr bool
auto enumerate(Iterables &&...iterables)
Range-for loop helper tracking the number of iteration.
Definition: enumerate.h:69
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep.
list
Definition: file_to_url.sh:28
open(RACETRACK) or die("Could not open file $RACETRACK for writing")