All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
validate_in_job.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #=================================================================================
3 #
4 # Name: validate_in_job.py
5 #
6 # Purpose: Run validation checks on batch worker, including declaring files to sam.
7 #
8 # Options:
9 #
10 # --dir <dir> - Directory containing .root files on batch worker.
11 # --logdir <dir> - Log file directory on batch worker (lar.stat and .json files).
12 # --outdir <dir> - Final output directory (e.g. dCache) where .root files will be copied.
13 # --declare <0/1> - Flag for declaring files to sam.
14 # --copy <0/1> - Flag for copyoing files directly to dropbox.
15 # --maintain_parentage <0/1> - If true, override default parentage according to
16 # env variables JOBS_PARENTS and JOBS_AUNTS.
17 # --data_file_type - Specify data file type (repeatable, default "root").
18 #
19 # Environment variables:
20 #
21 # JOBS_PARENTS - Override parent files (space-separated list).
22 # JOBS_AUNTS - Override aunt files (space-separated list).
23 #
24 #=================================================================================
25 from __future__ import absolute_import
26 from __future__ import print_function
27 import sys, os, json
28 from larbatch_utilities import ifdh_cp
29 import project_utilities
30 import samweb_cli
31 
32 samweb = None # Initialized SAMWebClient object
33 extractor_dict = None # Metadata extractor
34 proxy_ok = False
35 
36 # Check a single root file.
37 # Returns a 2-tuple containing the number of events and stream name.
38 # The number of events conveys the following information:
39 # 1. Number of events (>=0) in TTree named "Events."
40 # 2. -1 if root file does not contain an Events TTree, but is otherwise valid (openable).
41 # 3. -2 for error (root file does not exist or is not openable).
42 
43 def check_root_file(path, logdir):
44 
45  global proxy_ok
46  result = (-2, '')
47  json_ok = False
48  md = []
49 
50  # First check if root file exists (error if not).
51 
52  if not project_utilities.safeexist(path):
53  return result
54 
55  # See if we have precalculated metadata for this root file.
56 
57  json_path = os.path.join(logdir, os.path.basename(path) + '.json')
58  if project_utilities.safeexist(json_path):
59 
60  # Get number of events from precalculated metadata.
61 
62  try:
63  lines = project_utilities.saferead(json_path)
64  s = ''
65  for line in lines:
66  s = s + line
67 
68  # Convert json string to python dictionary.
69 
70  md = json.loads(s)
71 
72  # If we get this far, say the file was at least openable.
73 
74  result = (-1, '')
75 
76  # Extract number of events and stream name from metadata.
77 
78  if len(list(md.keys())) > 0:
79  nevroot = -1
80  stream = ''
81  if 'events' in md:
82  nevroot = int(md['events'])
83  if 'data_stream' in md:
84  stream = md['data_stream']
85  result = (nevroot, stream)
86  json_ok = True
87  except:
88  result = (-2, '')
89  return result
90 
91 
92 # Check data files in the specified directory.
93 
94 def check_root(outdir, logdir, data_file_types):
95 
96  # This method looks for files with file types matching data_file_types.
97  # If such files are found, it also checks for the existence of
98  # an Events TTree.
99  #
100  # Returns a 3-tuple containing the following information.
101  # 1. Total number of events in art root files.
102  # 2. A list of 3-tuples with an entry for each art root file.
103  # The 3-tuple contains the following information.
104  # a) Filename (full path).
105  # b) Number of events
106  # c) Stream name.
107  # 3. A list of histogram root files.
108 
109  nev = -1
110  roots = []
111  hists = []
112 
113  print('Checking root files in directory %s.' % outdir)
114  filenames = os.listdir(outdir)
115  for filename in filenames:
116  name, ext = os.path.splitext(filename)
117  if len(ext) > 0 and ext[1:] in data_file_types:
118  path = os.path.join(outdir, filename)
119  nevroot, stream = check_root_file(path, logdir)
120  if nevroot >= 0:
121  if nev < 0:
122  nev = 0
123  nev = nev + nevroot
124  roots.append((os.path.join(outdir, filename), nevroot, stream))
125 
126  elif nevroot == -1:
127 
128  # Valid data file, not an art root file.
129 
130  hists.append(os.path.join(outdir, filename))
131 
132  else:
133 
134  # Found a .root file that is not openable.
135  # Print a warning, but don't trigger any other error.
136 
137  print('Warning: File %s in directory %s is not a valid root file.' % (filename, outdir))
138 
139  # Done.
140 
141  return (nev, roots, hists)
142 
144 
145  # Get intialized samweb, if not already done.
146 
147  global samweb
148  global extractor_dict
149  global expMetaData
150 
151  if samweb == None:
152  samweb = project_utilities.samweb()
153  from extractor_dict import expMetaData
154 
155 # Main program.
156 
157 def main():
158 
159  ana = 0
160  nproc = 0
161 
162  import_samweb()
163 
164 
165  # Parse arguments.
166  checkdir=''
167  logdir=''
168  outdir=''
169  declare_file = 0
170  copy_to_dropbox = 0
171  maintain_parentage = 0
172  data_file_types = []
173  args = sys.argv[1:]
174  while len(args) > 0:
175 
176  if args[0] == '--dir' and len(args) > 1:
177  checkdir = args[1]
178  del args[0:2]
179  elif args[0] == '--logfiledir' and len(args) > 1:
180  logdir = args[1]
181  del args[0:2]
182  elif args[0] == '--outdir' and len(args) > 1:
183  outdir = args[1]
184  del args[0:2]
185  elif args[0] == '--declare' and len(args) > 1:
186  declare_file = int(args[1])
187  del args[0:2]
188  elif args[0] == '--copy' and len(args) > 1:
189  copy_to_dropbox = int(args[1])
190  del args[0:2]
191  elif args[0] == '--maintain_parentage' and len(args) > 1:
192  maintain_parentage = int(args[1])
193  del args[0:2]
194  elif args[0] == '--data_file_type' and len(args) > 1:
195  data_file_types.append(args[1])
196  del args[0:2]
197  else:
198  print('Unknown option %s' % args[0])
199  return 1
200 
201  # Add default data_file_types.
202 
203  if len(data_file_types) == 0:
204  data_file_types.append('root')
205 
206  status = 0 #global status code to tell us everything is ok.
207 
208  print("Do decleration in job: %d" % declare_file)
209 
210  # Check lar exit status (if any).
211  stat_filename = os.path.join(logdir, 'lar.stat')
212  if project_utilities.safeexist(stat_filename):
213  try:
214  status = int(project_utilities.saferead(stat_filename)[0].strip())
215  if status != 0:
216  print('Job in subdirectory %s ended with non-zero exit status %d.' % (checkdir, status))
217  status = 1
218 
219  except:
220  print('Bad file lar.stat in subdirectory %s.' % checkdir)
221  status = 1
222 
223  if checkdir == '':
224  print('No directory specified (use the --dir option.) Exiting.')
225  return 1
226  if logdir == '':
227  print('No log file directory specified (use the --logfiledir option.) Exiting.')
228  return 1
229 
230  nevts,rootfiles,hists = check_root(checkdir, logdir, data_file_types)
231 
232  # Set flag to do analysis-style validation if all of the following are true:
233  #
234  # 1. There is at least one valid histogram file.
235  # 2. The total number of artroot files and artroot events is zero.
236 
237  if len(hists) > 0 and len(rootfiles) == 0 and nevts <= 0:
238  ana = 1
239 
240  if not ana:
241  if len(rootfiles) == 0 or nevts < 0:
242  print('Problem with root file(s) in %s.' % checkdir)
243  status = 1
244 
245 
246  elif nevts < -1 or len(hists) == 0:
247  print('Problem with analysis root file(s) in %s.' % checkdir)
248  status = 1
249 
250 
251 # Then we need to loop over rootfiles and hists because those are good.
252 # Then we could make a list of those and check that the file in question for
253 # declaration is in that liast. also require that the par exit code is good for
254 # declaration.
255 
256  validate_list = open('validate.list','w')
257  file_list = open('files.list', 'w')
258  ana_file_list = open('filesana.list', 'w')
259 
260  events_list = open('events.list', 'w')
261 
262  #will be empty if the checks succeed
263  bad_list = open('bad.list', 'w')
264  missing_list = open('missing_files.list', 'w')
265 
266  # Print summary.
267 
268  if ana:
269  print("%d processes completed successfully." % nproc)
270  print("%d total good histogram files." % len(hists))
271 
272  else:
273  print("%d total good events." % nevts)
274  print("%d total good root files." % len(rootfiles))
275  print("%d total good histogram files." % len(hists))
276 
277  file_list_stream = {}
278 
279  # Generate bookkeeping files pertaining to artroot files.
280 
281  for rootfile in rootfiles:
282 
283  rootpath = rootfile[0]
284  nev = rootfile[1]
285  streamname = rootfile[2]
286 
287  # Make sure root file names do not exceed 200 characters.
288  rootname = os.path.basename(rootpath)
289  if len(rootname) >= 200:
290  print('Filename %s in subdirectory %s is longer than 200 characters.' % (
291  rootname, outdir))
292  status = 1
293 
294  if streamname not in file_list_stream:
295  file_list_stream[streamname] = open('files_%s.list' % streamname, 'w')
296  validate_list.write(rootpath + '\n')
297  file_on_scratch = os.path.join(outdir, os.path.basename(rootpath))
298  file_list.write(file_on_scratch + '\n')
299  file_list_stream[streamname].write(file_on_scratch + '\n')
300  events_list.write('%s %d \n' % (file_on_scratch, nev) )
301 
302  # Generate bookkeeping files pertaining to analysis files.
303 
304  for histfile in hists:
305  validate_list.write(histfile + '\n')
306  file_on_scratch = os.path.join(outdir, os.path.basename(histfile))
307  ana_file_list.write(file_on_scratch + '\n')
308 
309 
310 
311  validate_list.close()
312  file_list.close()
313  ana_file_list.close()
314  for streamname in list(file_list_stream.keys()):
315  file_list_stream[streamname].close()
316  events_list.close()
317 
318  #decide at this point if all the checks are ok. Write to missing_file_list first
319  missing_list.write('%d \n' %status)
320 
321  if status == 0:
322  bad_list.close()
323 
324  # begin SAM decleration
325 
326  if declare_file:
327 
328  # Declare artroot files.
329 
330  for rootfile in rootfiles:
331 
332  rootpath = rootfile[0]
333  fn = os.path.basename(rootpath)
334  declare_ok = False
335 
336  # Decide if we need to declare this file.
337  # It is OK if the file is already declared.
338  # In that case, do not try to declare it again.
339 
340  try:
341  md = samweb.getMetadata(fn)
342  if len(md) > 0:
343  declare_ok = True
344  print('File %s is already declared.' % fn)
345  except:
346  declare_ok = False
347 
348  if not declare_ok:
349  print('Declaring %s' % fn)
350  expSpecificMetaData = expMetaData(project_utilities.get_experiment(), rootpath)
351  md = expSpecificMetaData.getmetadata()
352 
353  # Decide if we want to override the internal parentage metadata.
354 
355  if maintain_parentage == 1:
356 
357  # Delete the old parents, if any.
358 
359  if 'parents' in md:
360  del md['parents']
361 
362  # change the parentage of the file based on it's parents and aunts from condor_lar
363 
364  jobs_parents = os.getenv('JOBS_PARENTS', '').split(" ")
365  jobs_aunts = os.getenv('JOBS_AUNTS', '').split(" ")
366  if(jobs_parents[0] != '' ):
367  md['parents'] = [{'file_name': parent} for parent in jobs_parents]
368  if(jobs_aunts[0] != '' ):
369  for aunt in jobs_aunts:
370  mixparent_dict = {'file_name': aunt}
371  if 'parents' not in md:
372  md['parents'] = []
373  md['parents'].append(mixparent_dict)
374 
375  if len(md) > 0:
376  project_utilities.test_kca()
377 
378  # Make lack of parent files a nonfatal error.
379  # This should probably be removed at some point.
380 
381  try:
382  samweb.declareFile(md=md)
383  declare_ok = True
384 
385  except samweb_cli.exceptions.SAMWebHTTPError as e:
386  print(e)
387  print('SAM declare failed.')
388  return 1
389 
390  except:
391  print('SAM declare failed.')
392  return 1
393 
394  else:
395  print('No sam metadata found for %s.' % fn)
396  declare_ok = False
397  status = 1
398 
399  if copy_to_dropbox == 1 and declare_ok:
400  print("Copying to Dropbox")
401  dropbox_dir = project_utilities.get_dropbox(fn)
402  rootPath = os.path.join(dropbox_dir, fn)
403  jsonPath = rootPath + ".json"
404  ifdh_cp(rootpath, rootPath)
405 
406  # Declare histogram files.
407 
408  for histpath in hists:
409 
410  declare_ok = False
411  fn = os.path.basename(histpath)
412 
413  # Decide if we need to declare this file.
414  # It is OK if the file is already declared.
415  # In that case, do not try to declare it again.
416 
417  try:
418  md = samweb.getMetadata(fn)
419  if len(md) > 0:
420  declare_ok = True
421  print('File %s is already declared.' % fn)
422  except:
423  declare_ok = False
424 
425  if not declare_ok:
426  print('Declaring %s' % fn)
427  json_file = os.path.join(logdir, fn + '.json')
428 
429  # Get metadata from json
430 
431  md = {}
432  if project_utilities.safeexist(json_file):
433  mdlines = project_utilities.saferead(json_file)
434  mdtext = ''
435  for line in mdlines:
436  mdtext = mdtext + line
437  try:
438  md = json.loads(mdtext)
439  except:
440  md = {}
441  pass
442 
443  if maintain_parentage == 1:
444 
445  # Delete the old parents, if any.
446 
447  if 'parents' in md:
448  del md['parents']
449 
450  # change the parentage of the file based on it's parents and aunts from condor_lar
451 
452  jobs_parents = os.getenv('JOBS_PARENTS', '').split(" ")
453  jobs_aunts = os.getenv('JOBS_AUNTS', '').split(" ")
454  if(jobs_parents[0] != '' ):
455  md['parents'] = [{'file_name': parent} for parent in jobs_parents]
456  if(jobs_aunts[0] != '' ):
457  for aunt in jobs_aunts:
458  mixparent_dict = {'file_name': aunt}
459  if 'parents' not in md:
460  md['parents'] = []
461  md['parents'].append(mixparent_dict)
462 
463  if len(md) > 0 and 'file_type' in md:
464  project_utilities.test_kca()
465 
466  # Make lack of parent files a nonfatal error.
467  # This should probably be removed at some point.
468 
469  try:
470  samweb.declareFile(md=md)
471  declare_ok = True
472 
473  except samweb_cli.exceptions.SAMWebHTTPError as e:
474  print(e)
475  print('SAM declare failed.')
476  declare_ok = False
477 
478  except:
479  print('SAM declare failed.')
480  declare_ok = False
481 
482  else:
483  print('No sam metadata found for %s.' % fn)
484  declare_ok = False
485 
486  if copy_to_dropbox == 1 and declare_ok:
487  print("Copying to Dropbox")
488  dropbox_dir = project_utilities.get_dropbox(fn)
489  rootPath = dropbox_dir + "/" + fn
490  jsonPath = rootPath + ".json"
491  ifdh_cp(histpath, rootPath)
492 
493  return status
494 
495  # something went wrong, so make a list of bad directories and potentially missing files
496  else:
497  # first get the subdir name on pnfs. this contains the job id
498  dir_on_scratch = os.path.basename(outdir)
499  print('Dir on scratch ' + dir_on_scratch)
500  bad_list.write('%s \n' % dir_on_scratch)
501  bad_list.close()
502  return status
503 
504 
505 if __name__ == '__main__' :
506  sys.exit(main())
then if[["$THISISATEST"==1]]
Definition: neoSmazza.sh:95
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def write
Definition: util.py:23
print OUTPUT<< EOF;< setup name="Default"version="1.0">< worldref="volWorld"/></setup ></gdml > EOF close(OUTPUT)
list
Definition: file_to_url.sh:28
open(RACETRACK) or die("Could not open file $RACETRACK for writing")