All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CheckPhotonLibraryJobs.py
Go to the documentation of this file.
1 #!//usr/bin/env python
2 #
3 # Run with `--help` for terse help.
4 #
5 # Changes:
6 # 20201216 (petrillo@slac.stanford.edu) [v2.1]
7 # way faster if not collecting output files and using existing good job list;
8 # keyboard interruption informs that the file lists are unchanged.
9 #
10 
11 __doc__ = """Checks the output of the jobs specified by their XML configuration file.
12 
13 The jobs must have been submitted by `project.py` and must have completed already.
14 
15 """
16 __version__ = "2.1"
17 
18 import sys, os
19 import re
20 import time
21 import logging
22 
23 
24 def removeSuffixes(
25  s: "string to be processed",
26  *suffixes: "strings to be removed from the end of `s`; better longer first",
27  ) -> "a copy of `s` with all suffixes removed from its end":
28  while True:
29  for suffix in suffixes:
30  if not s.endswith(suffix): continue
31  s = s[:-len(suffix)]
32  break # bash would say `continue 2`
33  else: return s
34  # while
35 # removeSuffixes()
36 
37 
39  """On the first call, it creates and returns an object.
40  On next calls, it returns the same object.
41  """
42  def __init__(self, fetchProc): self.fetchProc = fetchProc
43  def __call__(self, *args, **kwargs) -> "Returns the cached value.":
44  try: return self.cachedValue
45  except AttributeError: return self._fetchValue(*args, **kwargs)
46  # __call__()
47  def __nonzero__(self) -> "Returns whether the object is cached":
48  return hasattr(self, "cachedValue")
49  def _fetchValue(self, *args, **kwargs):
50  self.cachedValue = self.fetchProc(*args, **kwargs)
51  del self.fetchProc # not needed any more
52  return self.cachedValue
53  # _fetchValue()
54 # CachedValue
55 
56 
57 class JobIDclass:
58  """Class parsing a job ID."""
59 
60  class InvalidJobID(RuntimeError): pass
61 
62  JobIDpattern = re.compile(r'^([0-9]+)\.([0-9]+)@(.*)$')
63 
64  def __init__(self, jobIDstring): self.parse(jobIDstring)
65 
66  def parse(self, jobIDstring):
67  res = JobIDclass.JobIDpattern.match(jobIDstring)
68  if not res: raise InvalidJobID(jobIDstring)
69  self._jobNo = int(res.group(1))
70  self._subjobIndex = int(res.group(2))
71  self._server = res.group(3)
72  return self
73  # parse()
74 
75  def jobNo(self): return self._jobNo
76  def subjobIndex(self): return self._subjobIndex
77  def server(self): return self._server
78  def subjobID(self): return str(self.jobNo()) + "." + str(self.subjobIndex())
79  def jobID(self): return self.subjobID() + "@" + self.server()
80  def subjobTag(self): return str(self.jobNo()) + "_" + str(self.subjobIndex())
81 
82  def __str__(self): return self.jobID()
83  def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self.jobID())
84 
85 # class JobIDclass
86 
87 
89  """Iterator: returns one file name at each iteration.
90 
91  The text file is expected to have one file name per line. Lines are stripped
92  (`str.strip()`) of spaces.
93  Empty lines and lines which are a comment are skipped.
94  A comment line is a line whose first non-blank character is a hash character
95  ('#').
96 
97  The value returned at each iteration is (lineNo, fileNo, fileName); if
98  `withLineNo` or `withFileNo` are `False`, the respective element is omitted.
99  If only the file name is requested (all other option `False`) the return value
100  is not a tuple but just the file name.
101  """
102  def __init__(self,
103  # /, # Python 3.8 only
104  listFile: "text file containing the list" = None,
105  listName: "path of the text file containing the list" = None,
106  withLineNo: "whether to return also the number of line in file" = False,
107  withFileNo: "whether to return also the number of file" = False,
108  ):
109  assert listFile or listName
110 
111  if not listFile: listFile = open(listName, 'r')
112  self.fileIter = iter(listFile)
113  self.lineNo = 0 if withLineNo else None
114  self.fileNo = 0 if withFileNo else None
115  # __init__()
116 
117  def __iter__(self): return self
118 
119  def __next__(self):
120  while True:
121  fileName = next(self.fileIter).strip()
122  if self.lineNo is not None: self.lineNo += 1
123  if not fileName or fileName[0] == '#': continue
124 
125  if self.fileNo is not None: self.fileNo += 1
126  if not self.fileNo and not self.lineNo: return fileName
127  else: return tuple(filter(None, ( self.lineNo, self.fileNo, fileName, )))
128  # while
129  # next()
130 
131 # class FileListIterator
132 
133 
135  """Performs checks on job output and collects output files.
136 
137  """
138 
139  def __init__(self,
140  baseName: "name used for the check (defaults are shaped after it)",
141  goodList: "name of good job list (None: automatic; False: no list)" = None,
142  badList: "name of bad job list (None: automatic; False: no list)" = None,
143  fileList: "name of output file list (None: automatic; False: no list)" = None,
144  skipKnownGoodJobs: "do not check the jobs already in the good list" = False,
145  skipKnownBadJobs: "do not check the jobs already in the bad list" = False,
146  ):
147 
148  self.checkBaseDir = os.path.dirname(baseName)
149  # remove suffix:
151  os.path.splitext(os.path.basename(baseName))[0], # start with basename
152  '_xml', '-xml'
153  )
154 
155  self.goodListName, self.goodList, self.knownGoodJobs \
156  = self.setupList(goodList, 'goodxml', mustExist=skipKnownGoodJobs)
157  self.badListName, self.badList, self.knownBadJobs \
158  = self.setupList(badList, 'badxml', mustExist=skipKnownBadJobs)
159  self.outputFileListName, self.outputFileList, _ \
160  = self.setupList(fileList, 'outputfile')
161 
162  if not skipKnownGoodJobs: self.knownGoodJobs = set()
163  if not skipKnownBadJobs: self.knownBadJobs = set()
164 
165  # __init__()
166 
167 
168  def setupList(self, listName, listTag, mustExist = False):
169  """Returns the name of the list, an empty list to be filled and the existing
170  content.
171  """
172 
173  if listName is False: # set up for no list at all
174  return None, [], set()
175 
176  if listName is None:
177  listPath = os.path.join \
178  (self.checkBaseDir, self.checkBaseName + "-" + listTag + ".list")
179  elif not os.path.dirname(listName):
180  listPath = os.path.join(self.checkBaseDir, listName)
181  else: listPath = listName
182 
183  if os.path.isfile(listPath):
184  listContent = set(FileListIterator(listName=listPath))
185  (logging.info if mustExist else logging.debug) \
186  ("File list %s contains already %d entries.", listPath, len(listContent))
187  elif mustExist:
188  raise RuntimeError("File list '{}' ({} list) is required to exist."
189  .format(listPath, listTag))
190  else: listContent = set()
191 
192  return listPath, [], listContent
193  # setupList()
194 
195 
196  def reset(self):
197  """Resets all the counters and records."""
198 
199  self.goodList = []
200  self.badList = []
201  self.outputFileList = []
202 
203  # reset()
204 
205  def isCollectingOutput(self) -> "if the output file list is being filled":
206  return self.outputFileListName is not None
207 
208  def checkFromFile(self,
209  XMLfilePath: "path to the file containing XML job configuration",
210  projectName: "target the specified project name in the file" = "",
211  stageName: "target the specified stage name in the file" = "",
212  maxJobs: "if not None, process at most this many jobs" = None
213  ):
214 
215  nJobs = 0
216  for lineNo, fileName in FileListIterator(listName=XMLfilePath, withLineNo=True):
217 
218  if maxJobs is not None and nJobs >= maxJobs:
219  logging.info("Maximum number of jobs checked (%d).", maxJobs)
220  break
221  # if
222  nJobs += 1
223 
224  try:
225  self.checkJob(fileName, projectName=projectName, stageName=stageName)
226  except KeyboardInterrupt: raise
227  except Exception as e:
228  logging.error("Error processing job '%s' (file list line %d): %s",
229  fileName, lineNo, e)
230  # try ... except
231 
232  # for file line
233  # checkFromFile()
234 
235 
236  def checkJob(self,
237  XMLfilePath: "path to the file containing XML job configuration",
238  projectName: "target the specified project name in the file" = "",
239  stageName: "target the specified stage name in the file" = "",
240  ):
241 
242  if not os.path.isfile(XMLfilePath):
243  raise RuntimeError("Can't open file '{}'.".format(XMLfilePath))
244 
245  # jobInfo is a lazy callable returning the job information:
246  # information will be extracted only the first time `jobInfo()` is executed
247  jobInfo = CachedValue(lambda: self.getJobInfo \
248  (XMLfilePath, projectName=projectName, stageName=stageName))
249 
250  XMLfileDir, XMLfileName = os.path.split(XMLfilePath)
251 
252  if self.knownGoodJobs and (XMLfilePath in self.knownGoodJobs):
253  logging.info("%s: known as good, check skipped.", XMLfileName)
254  good = True
255  elif self.knownBadJobs and (XMLfilePath in self.knownBadJobs):
256  logging.info("%s: known as bad, check skipped.", XMLfileName)
257  good = False
258  else:
259  good = self.checkJobGoodness(jobInfo(), XMLfilePath)
260  #
261 
262  if good:
263  self.goodList.append(XMLfilePath)
264  if self.isCollectingOutput(): self.collectJobOutputFiles(jobInfo())
265  #
266  else:
267  self.badList.append(XMLfilePath)
268 
269  # checkJob()
270 
271 
272  def getJobInfo(self,
273  jobConfigFile: "path to the file containing XML job configuration",
274  projectName: "target the specified project name in the file" = "",
275  stageName: "target the specified stage name in the file" = "",
276  ):
277 
278  #
279  # get the project parsed by project.py
280  #
281  projInfo = project.get_project \
282  (jobConfigFile, projectname=projectName, stagename=stageName)
283 
284  if not projInfo: # this message should be improved...
285  raise RuntimeError("Job '{}' does not have project {} stage {}".format(
286  jobConfigFile, repr(projectName), repr(stageName)
287  ))
288  #
289 
290  stageInfo = \
291  next(filter(lambda stage: stage.name == stageName, projInfo.stages), None) \
292  if stageName else projInfo.stages[0]
293 
294  if not stageInfo:
295  raise RuntimeError("Job '{}' project {} does not have a stage {}".format(
296  jobConfigFile, repr(project.name), repr(stageName)
297  ))
298  #
299 
300  return stageInfo
301  # getJobInfo()
302 
303 
304  def checkJobGoodness(self, jobInfo, jobName):
305 
306  class JobCheckError(RuntimeError): pass
307 
308  try:
309  outputDir = jobInfo.outdir
310  logging.debug("Job '%s' output directory: '%s'", jobName, outputDir)
311 
312  if not os.path.isdir(outputDir):
313  raise JobCheckError("no output directory present ('{}')".format(outputDir))
314 
315  if not os.path.exists(os.path.join(outputDir, 'checked')):
316  raise JobCheckError("not checked (run `project.py --checkana` first)")
317 
318  for jobID in map(JobIDclass, open(os.path.join(outputDir, 'jobids.list'), 'r')):
319  logging.debug("Checking subjob '%s'", jobID)
320 
321  subjobDir = os.path.join(outputDir, jobID.subjobTag())
322  logging.debug("Subjob '%s' output directory: '%s'", jobID, subjobDir)
323  if not os.path.isdir(subjobDir):
324  raise JobCheckError("job %s missing output directory" % jobID)
325 
326  statusFile = os.path.join(subjobDir, 'larStage0.stat')
327  if not os.path.isfile(statusFile):
328  raise JobCheckError("job %s missing status file" % jobID)
329 
330  try:
331  status = int(open(statusFile, 'r').readline().strip())
332  except KeyboardInterrupt: raise
333  except Exception as e:
334  raise JobCheckError("job %s failed reading status file '%s': %s"
335  % (jobID, statusFile, e))
336  #
337 
338  if status != 0:
339  raise JobCheckError("job %s exited with error code %d" % (jobID, status))
340 
341  # for subjob
342 
343  expectedOutputFileList = os.path.join(outputDir, 'filesana.list')
344  if not os.path.exists(expectedOutputFileList):
345  raise JobCheckError("no output file list ('%s')" % expectedOutputFileList)
346 
347  expectedOutputFiles = list(FileListIterator(listName=expectedOutputFileList))
348  if len(expectedOutputFiles) == 0:
349  raise JobCheckError("job has no output file")
350 
351  foundOutputFiles = list(filter(os.path.isfile, expectedOutputFiles))
352  if len(foundOutputFiles) != len(expectedOutputFiles):
353  raise JobCheckError("only %d/%d output files still present"
354  % (len(foundOutputFiles), len(expectedOutputFiles)))
355  # if
356 
357  except JobCheckError as e:
358  logging.error("%s: %s", jobName, e)
359  return False
360  else:
361  logging.info("%s succeeded.", jobName)
362  return True
363 
364  # checkJobGoodness()
365 
366 
367  def collectJobOutputFiles(self, jobInfo):
368 
369  outputFileList = os.path.join(jobInfo.outdir, 'filesana.list')
370  self.outputFileList.extend(FileListIterator(listName=outputFileList))
371 
372  # collectJobOutputFiles()
373 
374 
375  def writeList(self, content, fileName, tag) -> "Whether the file list was written":
376 
377  if not fileName: return False # we are not asked to write the list
378 
379  # some file systems do not support overwriting
380  if os.path.exists(fileName):
381  try: os.remove(fileName)
382  except IOError as e:
383  logging.warning("Could not delete the old %s file list '%s': %s.",
384  tag, fileName, e)
385  # try ... except
386  # if
387 
388  # we do not write the list if it would be empty
389  if not len(content): return False
390 
391  listDir = os.path.dirname(fileName)
392  os.makedirs(os.path.normpath(listDir), exist_ok=True)
393 
394  with open(fileName, 'w') as listFile:
395 
396  print("# {} file list created on {}: {:d} entries".format(
397  tag, time.ctime(), len(content)
398  ), file=listFile,
399  )
400  listFile.write("\n".join(content))
401  listFile.write("\n")
402  # with
403 
404  logging.info("File list for %s created as '%s' with %d entries.",
405  tag, fileName, len(content))
406 
407  return True
408  # writeList()
409 
410 
411  def writeSummary(self):
412 
413  nJobs = len(self.badList) + len(self.goodList)
414 
415  # save file lists
416  if len(self.badList) > 0:
417 
418  if len(self.goodList) == 0:
419  logging.info("None of the %d jobs was successful!!", nJobs)
420  else:
421  logging.info("%d/%d jobs were not successful.", len(self.badList), nJobs)
422 
423  elif nJobs > 0:
424 
425  logging.info("All %d jobs were successful.", nJobs)
426 
427  else:
428  logging.error("No jobs checked.")
429 
430  try:
431  self.writeList(self.goodList, self.goodListName, "successful jobs")
432  except IOError as e:
433  logging.critical("Could not write good job file list '%s': %s",
434  self.goodListName, e)
435  # try
436 
437  try:
438  self.writeList(self.badList, self.badListName, "non-successful jobs")
439  except IOError as e:
440  logging.critical("Could not write bad job file list '%s': %s",
441  self.badListName, e)
442  # try
443 
444  try:
445  self.writeList(self.outputFileList, self.outputFileListName, "output files")
446  except IOError as e:
447  logging.critical("Could not write output file list '%s': %s",
448  self.outputFileListName, e)
449  # try
450 
451 
452  # writeSummary()
453 
454 # class JobChecker
455 
456 
457 
458 if __name__ == "__main__":
459 
460  logging.basicConfig()
461 
462  import argparse
463 
464  Parser = argparse.ArgumentParser(description=__doc__)
465 
466  Parser.add_argument \
467  ("XMLfileList", help="list of XML configuration of the jobs to check")
468 
469  Parser.add_argument("--maxjobs", "-n", dest="MaxJobs", default=None, type=int,
470  help="if specified, process at most this number of jobs")
471 
472  Parser.add_argument("--debug", "-d", action="store_true",
473  help="enable debugging messages")
474 
475  Parser.add_argument \
476  ('--version', '-V', action='version', version="%(prog)s v" + __version__)
477 
478  jobListGroup = Parser.add_argument_group("Job lists")
479 
480  jobListGroup.add_argument("--goodlist", "-g", dest="GoodJobList",
481  default=None, help="name of the list to be created with all good jobs")
482  jobListGroup.add_argument("--badlist", "-b", dest="BadJobList",
483  default=None, help="name of the list to be created with all bad jobs")
484  jobListGroup.add_argument("--outputlist", "-o", dest="OutputFileList",
485  default=None, help="name of the list to be created with ROOT output files")
486 
487  jobListGroup.add_argument("--nogoodlist", "-G", dest="NoGoodJobList",
488  action="store_true", help="do not create a list with all good jobs")
489  jobListGroup.add_argument("--nobadlist", "-B", dest="NoBadJobList",
490  action="store_true", help="do not create a list with all bad jobs")
491  jobListGroup.add_argument("--nooutputlist", "-O", dest="NoOutputFileList",
492  action="store_true", help="do not create a list with all output files")
493 
494  jobListGroup.add_argument("--skipgood", dest="SkipGoodJobs",
495  action="store_true",
496  help="do not check jobs that are already in the good list"
497  )
498  jobListGroup.add_argument("--skipbad", dest="SkipBadJobs",
499  action="store_true",
500  help="do not check jobs that are already in the bad list"
501  )
502 
503  args = Parser.parse_args()
504 
505  logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO)
506 
507  try:
508  import project
509  except ImportError as e:
510  logging.error("Could not load `project.py` as Python module: %s", e)
511 
512  jobChecker = JobChecker(
513  args.XMLfileList,
514  goodList=(False if args.NoGoodJobList else args.GoodJobList),
515  badList=(False if args.NoBadJobList else args.BadJobList),
516  fileList=(False if args.NoOutputFileList else args.OutputFileList),
517  skipKnownGoodJobs=args.SkipGoodJobs,
518  skipKnownBadJobs=args.SkipBadJobs,
519  )
520 
521  try:
522  jobChecker.checkFromFile(args.XMLfileList, maxJobs=args.MaxJobs)
523  except KeyboardInterrupt:
524  logging.warning("\nCheck interrupted; file lists will not be changed.")
525  sys.exit(1)
526  #
527 
528  jobChecker.writeSummary()
529 
530  sys.exit(0)
531 # __main__
532 
static std::string format(PyObject *obj, unsigned int pos, unsigned int indent, unsigned int maxlen, unsigned int depth)
Definition: fclmodule.cxx:374
do one_file $F done echo for F in find $TOP name CMakeLists txt print
Framework includes.
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep.
list
Definition: file_to_url.sh:28
open(RACETRACK) or die("Could not open file $RACETRACK for writing")