All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Functions | Variables
project Namespace Reference

Functions

def import_samweb
 
def docleanx
 
def dostatus
 
def find_projects
 
def get_projects
 
def select_project
 
def get_project
 
def next_stage
 
def previous_stage
 
def get_pubs_stage
 
def check_root_file
 
def check_root
 
def get_input_files
 
def doshorten
 
def untarlog
 
def docheck
 
def doquickcheck
 
def dofetchlog
 
def docheck_declarations
 
def dotest_declarations
 
def docheck_definition
 
def dotest_definition
 
def doundefine
 
def docheck_locations
 
def docheck_tape
 
def dojobsub
 
def dosubmit
 
def domerge
 
def doaudit
 
def help
 
def normxmlpath
 
def xmlhelp
 
def main
 
def safeopen
 
def scan_file
 

Variables

 samweb = None
 
 extractor_dict = None
 
 proxy_ok = False
 

Function Documentation

def project.check_root (   outdir,
  logdir,
  data_file_types 
)

Definition at line 889 of file project.py.

890 def check_root(outdir, logdir, data_file_types):
891 
892  # This method looks for files with file types matching data_file_types.
893  # If such files are found, it also checks for the existence of
894  # an Events TTree.
895  #
896  # Returns a 3-tuple containing the following information.
897  # 1. Total number of events in art root files.
898  # 2. A list of 3-tuples with an entry for each art root file.
899  # The 3-tuple contains the following information.
900  # a) Filename (full path).
901  # b) Number of events
902  # c) Stream name.
903  # 3. A list of histogram root files.
904 
905  nev = -1
906  roots = []
907  hists = []
908 
909  print('Checking root files in directory %s.' % outdir)
910  filenames = larbatch_posix.listdir(outdir)
911  for filename in filenames:
912  name, ext = os.path.splitext(filename)
913  if len(ext) > 0 and ext[1:] in data_file_types:
914  path = os.path.join(outdir, filename)
915  nevroot, stream = check_root_file(path, logdir)
916  if nevroot >= 0:
917  if nev < 0:
918  nev = 0
919  nev = nev + nevroot
920  roots.append((os.path.join(outdir, filename), nevroot, stream))
921 
922  elif nevroot == -1:
923 
924  # Valid data file, not an art root file.
925 
926  hists.append(os.path.join(outdir, filename))
927 
928  else:
929 
930  # Found a .root file that is not openable.
931  # Print a warning, but don't trigger any other error.
932 
933  print('Warning: File %s in directory %s is not a valid root file.' % (filename, outdir))
934 
935  # Done.
936 
937  return (nev, roots, hists)
938 
939 
940 # Get the list of input files for a project stage.
def check_root
Definition: project.py:889
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def check_root_file
Definition: project.py:838
def project.check_root_file (   path,
  logdir 
)

Definition at line 838 of file project.py.

839 def check_root_file(path, logdir):
840 
841  global proxy_ok
842  result = (-2, '')
843  json_ok = False
844  md = []
845 
846  # First check if root file exists (error if not).
847 
848  if not larbatch_posix.exists(path):
849  return result
850 
851  # See if we have precalculated metadata for this root file.
852 
853  json_path = os.path.join(logdir, os.path.basename(path) + '.json')
854  if larbatch_posix.exists(json_path):
855 
856  # Get number of events from precalculated metadata.
857 
858  try:
859  lines = larbatch_posix.readlines(json_path)
860  s = ''
861  for line in lines:
862  s = s + line
863 
864  # Convert json string to python dictionary.
865 
866  md = json.loads(s)
867 
868  # If we get this far, say the file was at least openable.
869 
870  result = (-1, '')
871 
872  # Extract number of events and stream name from metadata.
873 
874  if len(list(md.keys())) > 0:
875  nevroot = -1
876  stream = ''
877  if 'events' in md:
878  nevroot = int(md['events'])
879  if 'data_stream' in md:
880  stream = md['data_stream']
881  result = (nevroot, stream)
882  json_ok = True
883  except:
884  result = (-2, '')
885  return result
886 
887 
888 # Check data files in the specified directory.
def check_root_file
Definition: project.py:838
list
Definition: file_to_url.sh:28
def project.doaudit (   stage)

Definition at line 3778 of file project.py.

3779 def doaudit(stage):
3780 
3781  import_samweb()
3782  stage_has_input = stage.inputfile != '' or stage.inputlist != '' or stage.inputdef != ''
3783  if not stage_has_input:
3784  raise RuntimeError('No auditing for generator stage.')
3785 
3786  # Are there other ways to get output files other than through definition!?
3787 
3788  outputlist = []
3789  outparentlist = []
3790  if stage.defname != '':
3791  query = 'isparentof: (defname: %s) and availability: anylocation' %(stage.defname)
3792  try:
3793  outparentlist = samweb.listFiles(dimensions=query)
3794  outputlist = samweb.listFiles(defname=stage.defname)
3795  except:
3796  raise RuntimeError('Error accessing sam information for definition %s.\nDoes definition exist?' % stage.defname)
3797  else:
3798  raise RuntimeError('Output definition not found.')
3799 
3800  # To get input files one can use definition or get inputlist given to that stage or
3801  # get input files for a given stage as get_input_files(stage)
3802 
3803  inputlist = []
3804  if stage.inputdef != '':
3805  import_samweb()
3806  inputlist=samweb.listFiles(defname=stage.inputdef)
3807  elif stage.inputlist != '':
3808  ilist = []
3809  if larbatch_posix.exists(stage.inputlist):
3810  ilist = larbatch_posix.readlines(stage.inputlist)
3811  inputlist = []
3812  for i in ilist:
3813  inputlist.append(os.path.basename(i.strip()))
3814  else:
3815  raise RuntimeError('Input definition and/or input list does not exist.')
3816 
3817  difflist = set(inputlist)^set(outparentlist)
3818  mc = 0;
3819  me = 0;
3820  for item in difflist:
3821  if item in inputlist:
3822  mc = mc+1
3823  if mc==1:
3824  missingfilelistname = os.path.join(stage.bookdir, 'missingfiles.list')
3825  missingfilelist = safeopen(missingfilelistname)
3826  if mc>=1:
3827  missingfilelist.write("%s\n" %item)
3828  elif item in outparentlist:
3829  me = me+1
3830  childcmd = 'samweb list-files "ischildof: (file_name=%s) and availability: physical"' %(item)
3831  children = convert_str(subprocess.check_output(childcmd, shell=True)).splitlines()
3832  rmfile = list(set(children) & set(outputlist))[0]
3833  if me ==1:
3834  flist = []
3835  fnlist = os.path.join(stage.bookdir, 'files.list')
3836  if larbatch_posix.exists(fnlist):
3837  flist = larbatch_posix.readlines(fnlist)
3838  slist = []
3839  for line in flist:
3840  slist.append(line.split()[0])
3841  else:
3842  raise RuntimeError('No files.list file found %s, run project.py --check' % fnlist)
3843 
3844  # Declare the content status of the file as bad in SAM.
3845 
3846  sdict = {'content_status':'bad'}
3847  project_utilities.test_kca()
3848  samweb.modifyFileMetadata(rmfile, sdict)
3849  print('\nDeclaring the status of the following file as bad:', rmfile)
3850 
3851  # Remove this file from the files.list in the output directory.
3852 
3853  fn = []
3854  fn = [x for x in slist if os.path.basename(x.strip()) != rmfile]
3855  thefile = safeopen(fnlist)
3856  for item in fn:
3857  thefile.write("%s\n" % item)
3858 
3859  if mc==0 and me==0:
3860  print("Everything in order.")
3861  return 0
3862  else:
3863  print('Missing parent file(s) = ', mc)
3864  print('Extra parent file(s) = ',me)
3865 
3866  if mc != 0:
3867  missingfilelist.close()
3868  print("Creating missingfiles.list in the output directory....done!")
3869  if me != 0:
3870  thefile.close()
3871  #larbatch_posix.remove("jsonfile.json")
3872  print("For extra parent files, files.list redefined and content status declared as bad in SAM...done!")
3873 
3874 
3875 # Print help.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def safeopen
Definition: project.py:4739
def import_samweb
Definition: project.py:504
def doaudit
Definition: project.py:3778
list
Definition: file_to_url.sh:28
def project.docheck (   project,
  stage,
  ana,
  quick = False 
)

Definition at line 1087 of file project.py.

1088 def docheck(project, stage, ana, quick=False):
1089 
1090  # This method performs various checks on worker subdirectories, named
1091  # as <cluster>_<process>, where <cluster> and <process> are integers.
1092  # In contrast, sam start and stop project jobs are named as
1093  # <cluster>_start and <cluster>_stop.
1094  #
1095  # Return 0 if all checks are OK, meaning:
1096  # a) No errors detected for any process.
1097  # b) At least one good root file (if not ana).
1098  # Otherwise return nonzero.
1099  #
1100  # The following checks are performed.
1101  #
1102  # 1. Make sure subdirectory names are as expected.
1103  #
1104  # 2. Look for at least one art root file in each worker subdirectory
1105  # containing a valid Events TTree. Complain about any
1106  # that do not contain such a root file.
1107  #
1108  # 3. Check that the number of events in the Events tree are as expected.
1109  #
1110  # 4. Complain about any duplicated art root file names (if sam metadata is defined).
1111  #
1112  # 5. Check job exit status (saved in lar.stat).
1113  #
1114  # 6. For sam input, make sure that files sam_project.txt and cpid.txt are present.
1115  #
1116  # 7. Check that any non-art root files are openable.
1117  #
1118  # 8. Make sure file names do not exceed 200 characters (if sam metadata is defined).
1119  #
1120  # In analysis mode (if argumment ana != 0), skip checks 2-4, but still do
1121  # checks 1 and 5-7.
1122  #
1123  # This function also creates the following files in the specified directory.
1124  #
1125  # 1. files.list - List of good root files.
1126  # 2. events.list - List of good root files and number of events in each file.
1127  # 3. bad.list - List of worker subdirectories with problems.
1128  # 4. missing_files.list - List of unprocessed input files.
1129  # 5. sam_projects.list - List of successful sam projects.
1130  # 6. cpids.list - list of successful consumer process ids.
1131  # 7. filesana.list - List of non-art root files (histograms and/or ntuples).
1132  #
1133  # For projects with no input (i.e. generator jobs), if there are fewer than
1134  # the requisite number of good generator jobs, a "missing_files.list" will be
1135  # generated with lines containing /dev/null.
1136 
1137  # Untar log files into bookdir.
1138 
1139  untarlog(stage)
1140 
1141  # Quick check?
1142 
1143  if quick == 1 and not ana:
1144  return doquickcheck(project, stage, ana)
1145 
1146  stage.checkinput()
1147 
1148  # Check that output and log directories exist.
1149 
1150  if not larbatch_posix.exists(stage.outdir):
1151  print('Output directory %s does not exist.' % stage.outdir)
1152  return 1
1153  if not larbatch_posix.exists(stage.bookdir):
1154  print('Log directory %s does not exist.' % stage.bookdir)
1155  return 1
1156 
1157  import_samweb()
1158  has_metadata = project.file_type != '' or project.run_type != ''
1159  has_input = stage.inputfile != '' or stage.inputlist != '' or stage.inputdef != ''
1160  print('Checking directory %s' % stage.bookdir)
1161 
1162  # Count total number of events and root files.
1163 
1164  nev_tot = 0
1165  nroot_tot = 0
1166 
1167  # Loop over subdirectories (ignore files and directories named *_start and *_stop).
1168 
1169  procmap = {} # procmap[subdir] = <list of art root files and event counts>
1170  processes = [] # Integer process numbers derived from subdirectory names.
1171  filesana = [] # List of non-art root files.
1172  sam_projects = [] # List of sam projects.
1173  cpids = [] # List of successful sam consumer process ids.
1174  uris = [] # List of input files processed successfully.
1175  bad_workers = [] # List of bad worker subdirectories.
1176 
1177 
1178  for log_subpath, subdirs, files in larbatch_posix.walk(stage.bookdir):
1179 
1180  # Only examine files in leaf directories.
1181 
1182  if len(subdirs) != 0:
1183  continue
1184 
1185  subdir = os.path.relpath(log_subpath, stage.bookdir)
1186  if subdir == '.':
1187  continue
1188  out_subpath = os.path.join(stage.outdir, subdir)
1189  dirok = project_utilities.fast_isdir(log_subpath)
1190 
1191  # Update list of sam projects from start job.
1192 
1193  if dirok and log_subpath[-6:] == '_start':
1194  filename = os.path.join(log_subpath, 'sam_project.txt')
1195  if larbatch_posix.exists(filename):
1196  sam_project = larbatch_posix.readlines(filename)[0].strip()
1197  if sam_project != '' and not sam_project in sam_projects:
1198  sam_projects.append(sam_project)
1199 
1200  # Regular worker jobs checked here.
1201 
1202  if dirok and not subdir[-6:] == '_start' and not subdir[-5:] == '_stop' \
1203  and not subdir == 'log':
1204 
1205  bad = 0
1206 
1207  # Make sure that corresponding output directory exists.
1208 
1209  if not project_utilities.fast_isdir(out_subpath):
1210  print('No output directory corresponding to subdirectory %s.' % subdir)
1211  bad = 1
1212 
1213  # Check lar exit status (if any).
1214 
1215  if not bad:
1216  stat_filename = os.path.join(log_subpath, 'lar.stat')
1217  if larbatch_posix.exists(stat_filename):
1218  status = 0
1219  try:
1220  status = int(larbatch_posix.readlines(stat_filename)[0].strip())
1221  if status != 0:
1222  print('Job in subdirectory %s ended with non-zero exit status %d.' % (
1223  subdir, status))
1224  bad = 1
1225  except:
1226  print('Bad file lar.stat in subdirectory %s.' % subdir)
1227  bad = 1
1228 
1229  # Now check root files in this subdirectory.
1230 
1231  if not bad:
1232  nev = 0
1233  roots = []
1234  nev, roots, subhists = check_root(out_subpath, log_subpath, stage.datafiletypes)
1235  if not ana:
1236  if len(roots) == 0 or nev < 0:
1237  print('Problem with root file(s) in subdirectory %s.' % subdir)
1238  bad = 1
1239  elif nev < -1 or len(subhists) == 0:
1240  print('Problem with analysis root file(s) in subdirectory %s.' % subdir)
1241  bad = 1
1242 
1243 
1244  # Check for duplicate filenames (only if metadata is being generated).
1245 
1246  if not bad and has_metadata:
1247  for root in roots:
1248  rootname = os.path.basename(root[0])
1249  for s in list(procmap.keys()):
1250  oldroots = procmap[s]
1251  for oldroot in oldroots:
1252  oldrootname = os.path.basename(oldroot[0])
1253  if rootname == oldrootname:
1254  print('Duplicate filename %s in subdirectory %s' % (rootname,
1255  subdir))
1256  olddir = os.path.basename(os.path.dirname(oldroot[0]))
1257  print('Previous subdirectory %s' % olddir)
1258  bad = 1
1259 
1260  # Make sure root file names do not exceed 200 characters.
1261 
1262  if not bad and has_metadata:
1263  for root in roots:
1264  rootname = os.path.basename(root[0])
1265  if len(rootname) >= 200:
1266  print('Filename %s in subdirectory %s is longer than 200 characters.' % (
1267  rootname, subdir))
1268  bad = 1
1269 
1270  # Check existence of sam_project.txt and cpid.txt.
1271  # Update sam_projects and cpids.
1272 
1273  if not bad and stage.inputdef != '':
1274  filename1 = os.path.join(log_subpath, 'sam_project.txt')
1275  if not larbatch_posix.exists(filename1):
1276  print('Could not find file sam_project.txt')
1277  bad = 1
1278  filename2 = os.path.join(log_subpath, 'cpid.txt')
1279  if not larbatch_posix.exists(filename2):
1280  print('Could not find file cpid.txt')
1281  bad = 1
1282  if not bad:
1283  sam_project = larbatch_posix.readlines(filename1)[0].strip()
1284  if not sam_project in sam_projects:
1285  sam_projects.append(sam_project)
1286  cpid = larbatch_posix.readlines(filename2)[0].strip()
1287  if not cpid in cpids:
1288  cpids.append(cpid)
1289 
1290  # Check existence of transferred_uris.list.
1291  # Update list of uris.
1292 
1293  if not bad and (stage.inputlist !='' or stage.inputfile != ''):
1294  filename = os.path.join(log_subpath, 'transferred_uris.list')
1295  if not larbatch_posix.exists(filename):
1296  print('Could not find file transferred_uris.list')
1297  bad = 1
1298  if not bad:
1299  lines = larbatch_posix.readlines(filename)
1300  for line in lines:
1301  uri = line.strip()
1302  if uri != '':
1303  uris.append(uri)
1304 
1305  # Save process number, and check for duplicate process numbers
1306  # (only if no input).
1307 
1308  if not has_input:
1309  subdir_split = subdir.split('_')
1310  if len(subdir_split) > 1:
1311  process = int(subdir_split[1])
1312  if process in processes:
1313  print('Duplicate process number')
1314  bad = 1
1315  else:
1316  processes.append(process)
1317 
1318  # Save information about good root files.
1319 
1320  if not bad:
1321  procmap[subdir] = roots
1322 
1323  # Save good histogram files.
1324 
1325  filesana.extend(subhists)
1326 
1327  # Count good events and root files.
1328 
1329  nev_tot = nev_tot + nev
1330  nroot_tot = nroot_tot + len(roots)
1331 
1332  # Update list of bad workers.
1333 
1334  if bad:
1335  bad_workers.append(subdir)
1336 
1337  # Print/save result of checks for one subdirectory.
1338 
1339  if bad:
1340  print('Bad subdirectory %s.' % subdir)
1341 
1342  # Done looping over subdirectoryes.
1343  # Dictionary procmap now contains a list of good processes
1344  # and root files.
1345 
1346  # Before attempting to create bookkeeping files in stage.bookdir, check
1347  # whether this directory is readable. If not readable, return error
1348  # status without creating any bookkeeping files. This is to prevent
1349  # hangs.
1350 
1351  contents = larbatch_posix.listdir(stage.bookdir)
1352  if len(contents) == 0:
1353  print('Directory %s may be dead.' % stage.bookdir)
1354  print('Returning error status without creating any bookkeeping files.')
1355  return 1
1356 
1357  # Open files.
1358 
1359  filelistname = os.path.join(stage.bookdir, 'files.list')
1360  filelist = safeopen(filelistname)
1361 
1362  eventslistname = os.path.join(stage.bookdir, 'events.list')
1363  eventslist = safeopen(eventslistname)
1364 
1365  badfilename = os.path.join(stage.bookdir, 'bad.list')
1366  badfile = safeopen(badfilename)
1367 
1368  missingfilesname = os.path.join(stage.bookdir, 'missing_files.list')
1369  missingfiles = safeopen(missingfilesname)
1370 
1371  filesanalistname = os.path.join(stage.bookdir, 'filesana.list')
1372  filesanalist = safeopen(filesanalistname)
1373 
1374  urislistname = os.path.join(stage.bookdir, 'transferred_uris.list')
1375  urislist = safeopen(urislistname)
1376 
1377  # Generate "files.list" and "events.list."
1378  # Also fill stream-specific file list.
1379 
1380  nproc = 0
1381  streams = {} # {stream: file}
1382  nfile = 0
1383  for s in list(procmap.keys()):
1384  nproc = nproc + 1
1385  for root in procmap[s]:
1386  nfile = nfile + 1
1387  filelist.write('%s\n' % root[0])
1388  eventslist.write('%s %d\n' % root[:2])
1389  stream = root[2]
1390  if stream != '':
1391  if stream not in streams:
1392  streamlistname = os.path.join(stage.bookdir, 'files_%s.list' % stream)
1393  streams[stream] = safeopen(streamlistname)
1394  streams[stream].write('%s\n' % root[0])
1395 
1396  # Generate "bad.list"
1397 
1398  nerror = 0
1399  for bad_worker in bad_workers:
1400  badfile.write('%s\n' % bad_worker)
1401  nerror = nerror + 1
1402 
1403  # Generate "missing_files.list."
1404 
1405  nmiss = 0
1406  if stage.inputdef == '' and not stage.pubs_output:
1407  input_files = get_input_files(stage)
1408  if len(input_files) > 0:
1409  missing_files = list(set(input_files) - set(uris))
1410  for missing_file in missing_files:
1411  missingfiles.write('%s\n' % missing_file)
1412  nmiss = nmiss + 1
1413  else:
1414  nmiss = stage.num_jobs - len(procmap)
1415  for n in range(nmiss):
1416  missingfiles.write('/dev/null\n')
1417 
1418 
1419  # Generate "filesana.list."
1420 
1421  for hist in filesana:
1422  filesanalist.write('%s\n' % hist)
1423 
1424  # Generate "transferred_uris.list."
1425 
1426  for uri in uris:
1427  urislist.write('%s\n' % uri)
1428 
1429  # Print summary.
1430 
1431  if ana:
1432  print("%d processes completed successfully." % nproc)
1433  print("%d total good histogram files." % len(filesana))
1434  else:
1435  print("%d total good events." % nev_tot)
1436  print("%d total good root files." % nroot_tot)
1437  print("%d total good histogram files." % len(filesana))
1438 
1439  # Close files.
1440 
1441  filelist.close()
1442  if nfile == 0:
1443  project_utilities.addLayerTwo(filelistname)
1444  eventslist.close()
1445  if nfile == 0:
1446  project_utilities.addLayerTwo(eventslistname)
1447  if nerror == 0:
1448  badfile.write('\n')
1449  badfile.close()
1450  if nmiss == 0:
1451  missingfiles.write('\n')
1452  missingfiles.close()
1453  filesanalist.close()
1454  if len(filesana) == 0:
1455  project_utilities.addLayerTwo(filesanalistname)
1456  if len(uris) == 0:
1457  urislist.write('\n')
1458  urislist.close()
1459  for stream in list(streams.keys()):
1460  streams[stream].close()
1461 
1462  # Make sam files.
1463 
1464  if stage.inputdef != '' and not stage.pubs_input:
1465 
1466  # List of successful sam projects.
1467 
1468  sam_projects_filename = os.path.join(stage.bookdir, 'sam_projects.list')
1469  sam_projects_file = safeopen(sam_projects_filename)
1470  for sam_project in sam_projects:
1471  sam_projects_file.write('%s\n' % sam_project)
1472  sam_projects_file.close()
1473  if len(sam_projects) == 0:
1474  project_utilities.addLayerTwo(sam_projects_filename)
1475 
1476  # List of successfull consumer process ids.
1477 
1478  cpids_filename = os.path.join(stage.bookdir, 'cpids.list')
1479  cpids_file = safeopen(cpids_filename)
1480  for cpid in cpids:
1481  cpids_file.write('%s\n' % cpid)
1482  cpids_file.close()
1483  if len(cpids) == 0:
1484  project_utilities.addLayerTwo(cpids_filename)
1485 
1486  # Get number of consumed files.
1487 
1488  cpids_list = ''
1489  sep = ''
1490  for cpid in cpids:
1491  cpids_list = cpids_list + '%s%s' % (sep, cpid)
1492  sep = ','
1493  if cpids_list != '':
1494  dim = 'consumer_process_id %s and consumed_status consumed' % cpids_list
1495  import_samweb()
1496  nconsumed = samweb.countFiles(dimensions=dim)
1497  else:
1498  nconsumed = 0
1499 
1500  # Get number of unconsumed files.
1501 
1502  if cpids_list != '':
1503  udim = '(defname: %s) minus (%s)' % (stage.inputdef, dim)
1504  else:
1505  udim = 'defname: %s' % stage.inputdef
1506  nunconsumed = samweb.countFiles(dimensions=udim)
1507  nerror = nerror + nunconsumed
1508 
1509  # Sam summary.
1510 
1511  print('%d sam projects.' % len(sam_projects))
1512  print('%d successful consumer process ids.' % len(cpids))
1513  print('%d files consumed.' % nconsumed)
1514  print('%d files not consumed.' % nunconsumed)
1515 
1516  # Check project statuses.
1517 
1518  for sam_project in sam_projects:
1519  print('\nChecking sam project %s' % sam_project)
1520  import_samweb()
1521  url = samweb.findProject(sam_project, project_utilities.get_experiment())
1522  if url != '':
1523  result = samweb.projectSummary(url)
1524  nd = 0
1525  nc = 0
1526  nf = 0
1527  nproc = 0
1528  nact = 0
1529  if 'processes' in result:
1530  processes = result['processes']
1531  for process in processes:
1532  nproc = nproc + 1
1533  if 'status' in process:
1534  if process['status'] == 'active':
1535  nact = nact + 1
1536  if 'counts' in process:
1537  counts = process['counts']
1538  if 'delivered' in counts:
1539  nd = nd + counts['delivered']
1540  if 'consumed' in counts:
1541  nc = nc + counts['consumed']
1542  if 'failed' in counts:
1543  nf = nf + counts['failed']
1544  print('Status: %s' % result['project_status'])
1545  print('%d total processes' % nproc)
1546  print('%d active processes' % nact)
1547  print('%d files in snapshot' % result['files_in_snapshot'])
1548  print('%d files delivered' % (nd + nc))
1549  print('%d files consumed' % nc)
1550  print('%d files failed' % nf)
1551  print()
1552 
1553  # Done
1554 
1555  checkfilename = os.path.join(stage.bookdir, 'checked')
1556  checkfile = safeopen(checkfilename)
1557  checkfile.write('\n')
1558  checkfile.close()
1559  project_utilities.addLayerTwo(checkfilename)
1560 
1561  if stage.inputdef == '' or stage.pubs_input:
1562  print('%d processes with errors.' % nerror)
1563  print('%d missing files.' % nmiss)
1564  else:
1565  print('%d unconsumed files.' % nerror)
1566 
1567  # Return error status if any error or not good root file produced.
1568  # Also return error if no successful processes were detected
1569 
1570  result = 0
1571  if nerror != 0:
1572  result = 1
1573  if not ana and nroot_tot == 0:
1574  result = 1
1575  if len(procmap) == 0:
1576  result = 1
1577  return result
def check_root
Definition: project.py:889
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def write
Definition: util.py:23
def docheck
Definition: project.py:1087
def safeopen
Definition: project.py:4739
def import_samweb
Definition: project.py:504
def untarlog
Definition: project.py:1013
def doquickcheck
Definition: project.py:1578
def get_input_files
Definition: project.py:941
print OUTPUT<< EOF;< setup name="Default"version="1.0">< worldref="volWorld"/></setup ></gdml > EOF close(OUTPUT)
list
Definition: file_to_url.sh:28
def project.docheck_declarations (   logdir,
  outdir,
  declare,
  ana = False 
)

Definition at line 2071 of file project.py.

2072 def docheck_declarations(logdir, outdir, declare, ana=False):
2073 
2074  # Default result success (all files declared).
2075 
2076  result = 0
2077 
2078  # Initialize samweb.
2079 
2080  import_samweb()
2081 
2082  # Loop over root files listed in files.list or filesana.list.
2083 
2084  roots = []
2085  listname = 'files.list'
2086  if ana:
2087  listname = 'filesana.list'
2088  fnlist = os.path.join(logdir, listname)
2089  if larbatch_posix.exists(fnlist):
2090  roots = larbatch_posix.readlines(fnlist)
2091  else:
2092  raise RuntimeError('No %s file found %s, run project.py --check' % (listname, fnlist))
2093 
2094  for root in roots:
2095  path = root.strip()
2096  fn = os.path.basename(path)
2097  dirpath = os.path.dirname(path)
2098  dirname = os.path.relpath(dirpath, outdir)
2099 
2100  # Check metadata
2101 
2102  has_metadata = False
2103  try:
2104  md = samweb.getMetadata(filenameorid=fn)
2105  has_metadata = True
2106  except samweb_cli.exceptions.FileNotFound:
2107  pass
2108 
2109  # Report or declare file.
2110 
2111  if has_metadata:
2112  print('Metadata OK: %s' % fn)
2113  else:
2114  if declare:
2115  print('Declaring: %s' % fn)
2116  jsonfile = os.path.join(logdir, os.path.join(dirname, fn)) + '.json'
2117  mdjson = {}
2118  if larbatch_posix.exists(jsonfile):
2119  mdlines = larbatch_posix.readlines(jsonfile)
2120  mdtext = ''
2121  for line in mdlines:
2122  mdtext = mdtext + line
2123  try:
2124  md = json.loads(mdtext)
2125  mdjson = md
2126  except:
2127  pass
2128  md = {}
2129  if ana:
2130  md = mdjson
2131  else:
2132  expSpecificMetaData = expMetaData(os.environ['SAM_EXPERIMENT'],larbatch_posix.root_stream(path))
2133  md = expSpecificMetaData.getmetadata(mdjson)
2134  if len(md) > 0:
2135  project_utilities.test_kca()
2136 
2137  # Make lack of parent files a nonfatal error.
2138  # This should probably be removed at some point.
2139 
2140  try:
2141  samweb.declareFile(md=md)
2142  except:
2143  #if md.has_key('parents'):
2144  # del md['parents']
2145  # samweb.declareFile(md=md)
2146  print('SAM declare failed.')
2147  result = 1
2148 
2149  else:
2150  print('No sam metadata found for %s.' % fn)
2151  else:
2152  print('Not declared: %s' % fn)
2153  result = 1
2154 
2155  return result
2156 
2157 # Print summary of files returned by sam query.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def import_samweb
Definition: project.py:504
def docheck_declarations
Definition: project.py:2071
def project.docheck_definition (   defname,
  dim,
  define 
)

Definition at line 2176 of file project.py.

2177 def docheck_definition(defname, dim, define):
2178 
2179  # Default rssult success.
2180 
2181  result = 0
2182 
2183  # Return success for null definition.
2184 
2185  if defname == '':
2186  return result
2187 
2188  # Initialize samweb.
2189 
2190  import_samweb()
2191 
2192  # See if this definition already exists.
2193 
2194  def_exists = False
2195  try:
2196  desc = samweb.descDefinition(defname=defname)
2197  def_exists = True
2198  except samweb_cli.exceptions.DefinitionNotFound:
2199  pass
2200 
2201  # Make report and maybe make definition.
2202 
2203  if def_exists:
2204  print('Definition already exists: %s' % defname)
2205  else:
2206  if define:
2207  print('Creating definition %s.' % defname)
2208  project_utilities.test_kca()
2209  samweb.createDefinition(defname=defname, dims=dim)
2210  else:
2211  result = 1
2212  print('Definition should be created: %s' % defname)
2213 
2214  return result
2215 
2216 # Print summary of files returned by dataset definition.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def import_samweb
Definition: project.py:504
def docheck_definition
Definition: project.py:2176
def project.docheck_locations (   dim,
  outdir,
  add,
  clean,
  remove,
  upload 
)

Definition at line 2265 of file project.py.

2266 def docheck_locations(dim, outdir, add, clean, remove, upload):
2267 
2268  if add:
2269  print('Adding disk locations.')
2270  elif clean:
2271  print('Cleaning disk locations.')
2272  elif remove:
2273  print('Removing disk locations.')
2274  elif upload:
2275  print('Uploading to FTS.')
2276  else:
2277  print('Checking disk locations.')
2278 
2279  # Initialize samweb.
2280 
2281  import_samweb()
2282 
2283  # Loop over files queried by dimension string.
2284 
2285  filelist = samweb.listFiles(dimensions=dim, stream=False)
2286 
2287  # Look for listed files on disk under outdir.
2288 
2289  disk_dict = {}
2290  for filename in filelist:
2291  disk_dict[filename] = []
2292  for out_subpath, subdirs, files in larbatch_posix.walk(outdir):
2293 
2294  # Only examine files in leaf directories.
2295 
2296  if len(subdirs) != 0:
2297  continue
2298 
2299  for fn in files:
2300  if fn in filelist:
2301  disk_dict[fn].append(out_subpath)
2302 
2303  # Check sam locations.
2304 
2305  for filename in filelist:
2306  disk_locs = disk_dict[filename]
2307  sam_locs = samweb.locateFile(filenameorid=filename)
2308  if len(sam_locs) == 0 and not upload:
2309  print('No location: %s' % filename)
2310 
2311  # Make a double loop over disk and sam locations, in order
2312  # to identify locations that should added.
2313  # Note that we ignore the node part of the sam location.
2314 
2315  locs_to_add = []
2316  for disk_loc in disk_locs:
2317  should_add = True
2318  for sam_loc in sam_locs:
2319  if sam_loc['location_type'] == 'disk':
2320  if disk_loc == sam_loc['location'].split(':')[-1]:
2321  should_add = False
2322  break
2323  if should_add:
2324  locs_to_add.append(disk_loc)
2325 
2326  # Loop over sam locations, in order to identify locations
2327  # that should be removed. Note that for this step, we don't
2328  # necessarily assume that we found the disk location
2329  # in the directory search above, rather check the existence
2330  # of the file directly.
2331 
2332  locs_to_remove = []
2333  for sam_loc in sam_locs:
2334  if sam_loc['location_type'] == 'disk':
2335 
2336  # If remove is specified, uncondiontally remove this location.
2337 
2338  if remove:
2339  locs_to_remove.append(sam_loc['location'])
2340 
2341  # Otherwise, check if file exists.
2342 
2343  else:
2344 
2345  # Split off the node, if any, from the location.
2346 
2347  local_path = os.path.join(sam_loc['location'].split(':')[-1], filename)
2348  if not larbatch_posix.exists(local_path):
2349  locs_to_remove.append(sam_loc['location'])
2350 
2351  # Loop over sam locations and identify files that can be uploaded.
2352  # If this file has no disk locations, don't do anything (not an error).
2353  # In case we decide to upload this file, always upload from the first
2354  # disk location.
2355 
2356  locs_to_upload = {} # locs_to_upload[disk-location] = dropbox-directory
2357  should_upload = False
2358  if upload and len(disk_locs) > 0:
2359  should_upload = True
2360  for sam_loc in sam_locs:
2361  if sam_loc['location_type'] == 'tape':
2362  should_upload = False
2363  break
2364  if should_upload:
2365  dropbox = project_utilities.get_dropbox(filename)
2366  if not larbatch_posix.exists(dropbox):
2367  print('Making dropbox directory %s.' % dropbox)
2368  larbatch_posix.makedirs(dropbox)
2369  locs_to_upload[disk_locs[0]] = dropbox
2370 
2371  # Report results and do the actual adding/removing/uploading.
2372 
2373  for loc in locs_to_add:
2374  node = project_utilities.get_bluearc_server()
2375  if loc[0:6] == '/pnfs/':
2376  node = project_utilities.get_dcache_server()
2377  loc = node + loc.split(':')[-1]
2378  if add:
2379  print('Adding location: %s.' % loc)
2380  project_utilities.test_kca()
2381  samweb.addFileLocation(filenameorid=filename, location=loc)
2382  elif not upload:
2383  print('Can add location: %s.' % loc)
2384 
2385  for loc in locs_to_remove:
2386  if clean or remove:
2387  print('Removing location: %s.' % loc)
2388  project_utilities.test_kca()
2389  samweb.removeFileLocation(filenameorid=filename, location=loc)
2390  elif not upload:
2391  print('Should remove location: %s.' % loc)
2392 
2393  for loc in list(locs_to_upload.keys()):
2394  dropbox = locs_to_upload[loc]
2395 
2396  # Make sure dropbox directory exists.
2397 
2398  if not larbatch_posix.isdir(dropbox):
2399  print('Dropbox directory %s does not exist.' % dropbox)
2400  else:
2401 
2402  # Test whether this file has already been copied to dropbox directory.
2403 
2404  dropbox_filename = os.path.join(dropbox, filename)
2405  if larbatch_posix.exists(dropbox_filename):
2406  print('File %s already exists in dropbox %s.' % (filename, dropbox))
2407  else:
2408 
2409  # Copy file to dropbox.
2410 
2411  loc_filename = os.path.join(loc, filename)
2412 
2413  # Decide whether to use a symlink or copy.
2414 
2415  if project_utilities.mountpoint(loc_filename) == \
2416  project_utilities.mountpoint(dropbox_filename):
2417  print('Symlinking %s to dropbox directory %s.' % (filename, dropbox))
2418  relpath = os.path.relpath(os.path.realpath(loc_filename), dropbox)
2419  print('relpath=',relpath)
2420  print('dropbox_filename=',dropbox_filename)
2421  larbatch_posix.symlink(relpath, dropbox_filename)
2422 
2423  else:
2424  print('Copying %s to dropbox directory %s.' % (filename, dropbox))
2425  larbatch_posix.copy(loc_filename, dropbox_filename)
2426 
2427  return 0
2428 
2429 # Check tape locations.
2430 # Return 0 if all files in sam have tape locations.
2431 # Return nonzero if some files in sam don't have tape locations.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def import_samweb
Definition: project.py:504
def docheck_locations
Definition: project.py:2265
list
Definition: file_to_url.sh:28
def project.docheck_tape (   dim)

Definition at line 2432 of file project.py.

2433 def docheck_tape(dim):
2434 
2435  # Default result success.
2436 
2437  result = 0
2438 
2439  # Initialize samweb.
2440 
2441  import_samweb()
2442 
2443  # Loop over files queried by dimension string.
2444 
2445  nbad = 0
2446  ntot = 0
2447  filelist = samweb.listFiles(dimensions=dim, stream=True)
2448  while 1:
2449  try:
2450  filename = next(filelist)
2451  except StopIteration:
2452  break
2453 
2454  # Got a filename.
2455 
2456  ntot = ntot + 1
2457 
2458  # Look for sam tape locations.
2459 
2460  is_on_tape = False
2461  sam_locs = samweb.locateFile(filenameorid=filename)
2462  for sam_loc in sam_locs:
2463  if sam_loc['location_type'] == 'tape':
2464  is_on_tape = True
2465  break
2466 
2467  if is_on_tape:
2468  print('On tape: %s' % filename)
2469  else:
2470  result = 1
2471  nbad = nbad + 1
2472  print('Not on tape: %s' % filename)
2473 
2474  print('%d files.' % ntot)
2475  print('%d files need to be store on tape.' % nbad)
2476 
2477  return result
2478 
2479 # Copy files to workdir and issue jobsub submit command.
2480 # Return jobsubid.
2481 # Raise exception if jobsub_submit returns a nonzero status.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def import_samweb
Definition: project.py:504
def docheck_tape
Definition: project.py:2432
def project.docleanx (   projects,
  projectname,
  stagename,
  clean_descendants = True 
)

Definition at line 519 of file project.py.

520 def docleanx(projects, projectname, stagename, clean_descendants = True):
521  print(projectname, stagename)
522 
523  # Loop over projects and stages.
524  # Clean all stages beginning with the specified project/stage.
525  # For empty project/stage name, clean all stages.
526  #
527  # For safety, only clean directories if the uid of the
528  # directory owner matches the current uid or effective uid.
529  # Do this even if the delete operation is allowed by filesystem
530  # permissions (directories may be group- or public-write
531  # because of batch system).
532 
533  uid = os.getuid()
534  euid = os.geteuid()
535  cleaned_bookdirs = []
536 
537  # Clean iteratively.
538 
539  done_cleaning = False
540  while not done_cleaning:
541 
542  cleaned_something = False
543 
544  # Loop over projects and stages.
545 
546  for project in projects:
547  for stage in project.stages:
548 
549  clean_this_stage = False
550 
551  # Skip this stage if it has already been cleaned.
552 
553  if not stage.bookdir in cleaned_bookdirs:
554 
555  # Determine if this is the first stage we want to clean.
556 
557  if (projectname == '' or project.name == projectname) and \
558  (stagename == '' or stage.name == stagename):
559 
560  clean_this_stage = True
561 
562  # Determine if we want to clean this stage because it uses
563  # an input filelist that lives in an already-cleaned bookdir.
564 
565  elif clean_descendants and stage.inputlist != '' and \
566  os.path.dirname(stage.inputlist) in cleaned_bookdirs:
567 
568  clean_this_stage = True
569 
570  # Do cleaning.
571 
572  if clean_this_stage:
573  cleaned_something = True
574  cleaned_bookdirs.append(stage.bookdir)
575 
576  print('Clean project %s, stage %s' % (project.name, stage.name))
577 
578  # Clean this stage outdir.
579 
580  if larbatch_posix.exists(stage.outdir):
581  dir_uid = larbatch_posix.stat(stage.outdir).st_uid
582  if dir_uid == uid or dir_uid == euid:
583  print('Clean directory %s.' % stage.outdir)
584  larbatch_posix.rmtree(stage.outdir)
585  else:
586  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.outdir)
587 
588  # Clean this stage logdir.
589 
590  if larbatch_posix.exists(stage.logdir):
591  dir_uid = larbatch_posix.stat(stage.logdir).st_uid
592  if dir_uid == uid or dir_uid == euid:
593  print('Clean directory %s.' % stage.logdir)
594  larbatch_posix.rmtree(stage.logdir)
595  else:
596  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.logdir)
597 
598  # Clean this stage workdir.
599 
600  if larbatch_posix.exists(stage.workdir):
601  dir_uid = larbatch_posix.stat(stage.workdir).st_uid
602  if dir_uid == uid or dir_uid == euid:
603  print('Clean directory %s.' % stage.workdir)
604  larbatch_posix.rmtree(stage.workdir)
605  else:
606  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.workdir)
607 
608  # Clean this stage bookdir.
609 
610  if larbatch_posix.exists(stage.bookdir):
611  dir_uid = larbatch_posix.stat(stage.bookdir).st_uid
612  if dir_uid == uid or dir_uid == euid:
613  print('Clean directory %s.' % stage.bookdir)
614  larbatch_posix.rmtree(stage.bookdir)
615  else:
616  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.bookdir)
617 
618  done_cleaning = not cleaned_something
619 
620  # Done.
621 
622  return
623 
624 # Stage status fuction.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def docleanx
Definition: project.py:519
def project.dofetchlog (   project,
  stage 
)

Definition at line 1946 of file project.py.

1947 def dofetchlog(project, stage):
1948 
1949  # This funciton fetches jobsub log files using command
1950  # jobsub_fetchlog. Fetched log files are stored in a subdirectory
1951  # called "log" in the stage output directory.
1952  #
1953  # This function has uses an algorithm to determine the log file
1954  # job id that is based on the worker environment as recorded in
1955  # file "env.txt" as returned from any worker. Therefore, at least
1956  # one worker must have completed (successfully or not) for this
1957  # function to succeed.
1958 
1959  stage.checkinput()
1960  stage.checkdirs()
1961 
1962  # Look for files called "env.txt" in any subdirectory of
1963  # stage.bookdir.
1964 
1965  logids = []
1966  for dirpath, dirnames, filenames in larbatch_posix.walk(stage.bookdir):
1967  for filename in filenames:
1968  if filename == 'env.txt':
1969 
1970  # Look for either environment variable:
1971  #
1972  # 1. JOBSUBPARENTJOBID
1973  # 2. JOBSUBJOBID
1974  #
1975  # In either case, construct the log file id by
1976  # changing the process number to zero.
1977 
1978  logid = ''
1979  envpath = os.path.join(dirpath, filename)
1980  vars = larbatch_posix.readlines(envpath)
1981 
1982  # JOBSUBPARENTJOBID
1983 
1984  for var in vars:
1985  varsplit = var.split('=', 1)
1986  name = varsplit[0].strip()
1987  if name == 'JOBSUBPARENTJOBID':
1988  logid = varsplit[1].strip()
1989 
1990  # Fix up the log file id by changing the process
1991  # number to zero.
1992 
1993  logsplit = logid.split('@', 1)
1994  cluster_process = logsplit[0]
1995  server = logsplit[1]
1996  cluster = cluster_process.split('.', 1)[0]
1997  logid = cluster + '.0' + '@' + server
1998  logids.append(logid)
1999  break
2000 
2001  # JOBSUBJOBID
2002 
2003  if logid == '':
2004  for var in vars:
2005  varsplit = var.split('=', 1)
2006  name = varsplit[0].strip()
2007  if name == 'JOBSUBJOBID':
2008  logid = varsplit[1].strip()
2009 
2010  # Fix up the log file id by changing the process
2011  # number to zero.
2012 
2013  logsplit = logid.split('@', 1)
2014  cluster_process = logsplit[0]
2015  server = logsplit[1]
2016  cluster = cluster_process.split('.', 1)[0]
2017  logid = cluster + '.0' + '@' + server
2018  logids.append(logid)
2019  break
2020 
2021  # Process all of the log ids that we found.
2022 
2023  if len(logids) > 0:
2024 
2025  # Make a directory to receive log files.
2026 
2027  logdir = os.path.join(stage.bookdir, 'log')
2028  if larbatch_posix.exists(logdir):
2029  larbatch_posix.rmtree(logdir)
2030  larbatch_posix.mkdir(logdir)
2031 
2032  # Loop over log ids.
2033 
2034  for logid in set(logids):
2035 
2036  # Do the actual fetch.
2037  # Tarball is fetched into current directory, and unpacked
2038  # into log directory.
2039 
2040  print('Fetching log files for id %s' % logid)
2041  command = ['jobsub_fetchlog']
2042  if project.server != '-' and project.server != '':
2043  command.append('--jobsub-server=%s' % project.server)
2044  command.append('--jobid=%s' % logid)
2045  command.append('--dest-dir=%s' % logdir)
2046  jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2047  jobout, joberr = jobinfo.communicate()
2048  jobout = convert_str(jobout)
2049  joberr = convert_str(joberr)
2050  rc = jobinfo.poll()
2051  if rc != 0:
2052  raise JobsubError(command, rc, jobout, joberr)
2053 
2054  return 0
2055 
2056  else:
2057 
2058  # Done (failure).
2059  # If we fall out of the loop, we didn't find any files called env.txt, or
2060  # they didn't contain the right environment variables we need.
2061  # In this case, the most likely explanation is that no workers have
2062  # completed yet.
2063 
2064  print('Failed to fetch log files.')
2065  return 1
2066 
2067 
2068 # Check sam declarations.
2069 # Return 0 if all files are declared or don't have internal metadata.
2070 # Return nonzero if some files have metadata but are are not declared.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def dofetchlog
Definition: project.py:1946
def project.dojobsub (   project,
  stage,
  makeup,
  recur,
  dryrun 
)

Definition at line 2482 of file project.py.

2483 def dojobsub(project, stage, makeup, recur, dryrun):
2484 
2485  # Default return.
2486 
2487  jobid = ''
2488 
2489  # Process map, to be filled later if we need one.
2490 
2491  procmap = ''
2492 
2493  # Temporary directory where we will copy the batch script(s) and dag.
2494 
2495  tmpdir = tempfile.mkdtemp()
2496 
2497  # Temporary directory where we will copy files destined for stage.workdir.
2498 
2499  tmpworkdir = tempfile.mkdtemp()
2500 
2501  #we're going to let jobsub_submit copy the workdir contents for us
2502  #each file that would go into the workdir is going to be added with
2503  # '-f <input_file>' with the full path, it can be either BlueArc or /pnfs/uboone
2504 
2505  jobsub_workdir_files_args = []
2506 
2507  # If there is an input list, copy it to the work directory.
2508 
2509  input_list_name = ''
2510  if stage.inputlist != '':
2511  input_list_name = os.path.basename(stage.inputlist)
2512  work_list_name = os.path.join(tmpworkdir, input_list_name)
2513  if stage.inputlist != work_list_name:
2514  input_files = larbatch_posix.readlines(stage.inputlist)
2515  print('Making input list.')
2516  work_list = safeopen(work_list_name)
2517  for input_file in input_files:
2518  print('Adding input file %s' % input_file)
2519  work_list.write('%s\n' % input_file.strip())
2520  work_list.close()
2521  print('Done making input list.')
2522 
2523  # Now locate the fcl file on the fcl search path.
2524 
2525  fcls = project.get_fcl(stage.fclname)
2526 
2527  # Copy the fcl file to the work directory.
2528 
2529  for fcl in fcls:
2530  workfcl = os.path.join(tmpworkdir, os.path.basename(fcl))
2531  if os.path.abspath(fcl) != os.path.abspath(workfcl):
2532  larbatch_posix.copy(fcl, workfcl)
2533 
2534 
2535  # Construct a wrapper fcl file (called "wrapper.fcl") that will include
2536  # the original fcls, plus any overrides that are dynamically generated
2537  # in this script.
2538 
2539  #print 'Making wrapper.fcl'
2540  wrapper_fcl_name = os.path.join(tmpworkdir, 'wrapper.fcl')
2541  wrapper_fcl = safeopen(wrapper_fcl_name)
2542  stageNum = 0
2543  original_project_name = project.name
2544  original_stage_name = stage.name
2545  original_project_version = project.version
2546 
2547  for fcl in fcls:
2548  wrapper_fcl.write('#---STAGE %d\n' % stageNum)
2549  wrapper_fcl.write('#include "%s"\n' % os.path.basename(fcl))
2550  wrapper_fcl.write('\n')
2551 
2552  # Generate overrides for sam metadata fcl parameters.
2553  # Only do this if our xml file appears to contain sam metadata.
2554 
2555  xml_has_metadata = project.file_type != '' or \
2556  project.run_type != ''
2557  if xml_has_metadata:
2558 
2559  # Add overrides for FileCatalogMetadata.
2560 
2561  if project.release_tag != '':
2562  wrapper_fcl.write('services.FileCatalogMetadata.applicationVersion: "%s"\n' % \
2563  project.release_tag)
2564  else:
2565  wrapper_fcl.write('services.FileCatalogMetadata.applicationVersion: "test"\n')
2566  if project.file_type:
2567  wrapper_fcl.write('services.FileCatalogMetadata.fileType: "%s"\n' % \
2568  project.file_type)
2569  if project.run_type:
2570  wrapper_fcl.write('services.FileCatalogMetadata.runType: "%s"\n' % \
2571  project.run_type)
2572 
2573 
2574  # Add experiment-specific sam metadata.
2575 
2576  if stageNum < len(stage.project_name) and stage.project_name[stageNum] != '':
2577  project.name = stage.project_name[stageNum]
2578  if stageNum < len(stage.stage_name) and stage.stage_name[stageNum] != '':
2579  stage.name = stage.stage_name[stageNum]
2580  if stageNum < len(stage.project_version) and stage.project_version[stageNum] != '':
2581  project.version = stage.project_version[stageNum]
2582  sam_metadata = project_utilities.get_sam_metadata(project, stage)
2583  if sam_metadata:
2584  wrapper_fcl.write(sam_metadata)
2585  project.name = original_project_name
2586  stage.name = original_stage_name
2587  project.version = original_project_version
2588 
2589  # In case of generator jobs, add override for pubs run number
2590  # (subrun number is overridden inside condor_lar.sh).
2591 
2592  if (not stage.pubs_input and stage.pubs_output) or stage.output_run:
2593  wrapper_fcl.write('source.firstRun: %d\n' % stage.output_run)
2594 
2595  # Add overrides for genie flux parameters.
2596  # This section will normally be generated for any kind of generator job,
2597  # and should be harmless for non-genie generators.
2598 
2599  if stage.maxfluxfilemb != 0 and stageNum == 0:
2600  wrapper_fcl.write('physics.producers.generator.FluxCopyMethod: "IFDH"\n')
2601  wrapper_fcl.write('physics.producers.generator.MaxFluxFileMB: %d\n' % stage.maxfluxfilemb)
2602  wrapper_fcl.write('#---END_STAGE\n')
2603  stageNum = 1 + stageNum
2604 
2605  wrapper_fcl.close()
2606  #print 'Done making wrapper.fcl'
2607 
2608  # Get experiment setup script. Maybe copy to work directory.
2609  # After this section, either variable (not both) abssetupscript or
2610  # setupscript will be set to a non-null value.
2611 
2612  abssetupscript = project_utilities.get_setup_script_path()
2613  setupscript = ''
2614  if not abssetupscript.startswith('/cvmfs/'):
2615  setupscript = os.path.join(stage.workdir,'setup_experiment.sh')
2616  larbatch_posix.copy(abssetupscript, setupscript)
2617  jobsub_workdir_files_args.extend(['-f', setupscript])
2618  abssetupscript = ''
2619 
2620  # Copy and rename batch script to the work directory.
2621 
2622  if stage.batchname != '':
2623  workname = stage.batchname
2624  else:
2625  workname = '%s-%s-%s' % (stage.name, project.name, project.release_tag)
2626  workname = workname + os.path.splitext(stage.script)[1]
2627  #workscript = os.path.join(tmpworkdir, workname)
2628  workscript = os.path.join(tmpdir, workname)
2629  if stage.script != workscript:
2630  larbatch_posix.copy(stage.script, workscript)
2631 
2632  # Copy and rename sam start project script to work directory.
2633 
2634  workstartscript = ''
2635  workstartname = ''
2636  if stage.start_script != '':
2637  workstartname = 'start-%s' % workname
2638  #workstartscript = os.path.join(tmpworkdir, workstartname)
2639  workstartscript = os.path.join(tmpdir, workstartname)
2640  if stage.start_script != workstartscript:
2641  larbatch_posix.copy(stage.start_script, workstartscript)
2642 
2643  # Copy and rename sam stop project script to work directory.
2644 
2645  workstopscript = ''
2646  workstopname = ''
2647  if stage.stop_script != '':
2648  workstopname = 'stop-%s' % workname
2649  #workstopscript = os.path.join(tmpworkdir, workstopname)
2650  workstopscript = os.path.join(tmpdir, workstopname)
2651  if stage.stop_script != workstopscript:
2652  larbatch_posix.copy(stage.stop_script, workstopscript)
2653 
2654  # Copy worker initialization scripts to work directory.
2655 
2656  for init_script in stage.init_script:
2657  if init_script != '':
2658  if not larbatch_posix.exists(init_script):
2659  raise RuntimeError('Worker initialization script %s does not exist.\n' % \
2660  init_script)
2661  work_init_script = os.path.join(tmpworkdir, os.path.basename(init_script))
2662  if init_script != work_init_script:
2663  larbatch_posix.copy(init_script, work_init_script)
2664 
2665  # Update stage.init_script from list to single script.
2666 
2667  n = len(stage.init_script)
2668  if n == 0:
2669  stage.init_script = ''
2670  elif n == 1:
2671  stage.init_script = stage.init_script[0]
2672  else:
2673 
2674  # If there are multiple init scripts, generate a wrapper init script init_wrapper.sh.
2675 
2676  work_init_wrapper = os.path.join(tmpworkdir, 'init_wrapper.sh')
2677  f = open(work_init_wrapper, 'w')
2678  f.write('#! /bin/bash\n')
2679  for init_script in stage.init_script:
2680  f.write('echo\n')
2681  f.write('echo "Executing %s"\n' % os.path.basename(init_script))
2682  f.write('./%s\n' % os.path.basename(init_script))
2683  f.write('status=$?\n')
2684  f.write('echo "%s finished with status $status"\n' % os.path.basename(init_script))
2685  f.write('if [ $status -ne 0 ]; then\n')
2686  f.write(' exit $status\n')
2687  f.write('fi\n')
2688  f.write('echo\n')
2689  f.write('echo "Done executing initialization scripts."\n')
2690  f.close()
2691  stage.init_script = work_init_wrapper
2692 
2693  # Copy worker initialization source scripts to work directory.
2694 
2695  for init_source in stage.init_source:
2696  if init_source != '':
2697  if not larbatch_posix.exists(init_source):
2698  raise RuntimeError('Worker initialization source script %s does not exist.\n' % \
2699  init_source)
2700  work_init_source = os.path.join(tmpworkdir, os.path.basename(init_source))
2701  if init_source != work_init_source:
2702  larbatch_posix.copy(init_source, work_init_source)
2703 
2704  # Update stage.init_source from list to single script.
2705 
2706  n = len(stage.init_source)
2707  if n == 0:
2708  stage.init_source = ''
2709  elif n == 1:
2710  stage.init_source = stage.init_source[0]
2711  else:
2712 
2713  # If there are multiple init source scripts, generate a wrapper init script
2714  # init_source_wrapper.sh.
2715 
2716  work_init_source_wrapper = os.path.join(tmpworkdir, 'init_source_wrapper.sh')
2717  f = open(work_init_source_wrapper, 'w')
2718  for init_source in stage.init_source:
2719  f.write('echo\n')
2720  f.write('echo "Sourcing %s"\n' % os.path.basename(init_source))
2721  f.write('source %s\n' % os.path.basename(init_source))
2722  f.write('echo\n')
2723  f.write('echo "Done sourcing initialization scripts."\n')
2724  f.close()
2725  stage.init_source = work_init_source_wrapper
2726 
2727  # Copy worker end-of-job scripts to work directory.
2728 
2729  for end_script in stage.end_script:
2730  if end_script != '':
2731  if not larbatch_posix.exists(end_script):
2732  raise RuntimeError('Worker end-of-job script %s does not exist.\n' % end_script)
2733  work_end_script = os.path.join(tmpworkdir, os.path.basename(end_script))
2734  if end_script != work_end_script:
2735  larbatch_posix.copy(end_script, work_end_script)
2736 
2737  # Update stage.end_script from list to single script.
2738 
2739  n = len(stage.end_script)
2740  if n == 0:
2741  stage.end_script = ''
2742  elif n == 1:
2743  stage.end_script = stage.end_script[0]
2744  else:
2745 
2746  # If there are multiple end scripts, generate a wrapper end script end_wrapper.sh.
2747 
2748  work_end_wrapper = os.path.join(tmpworkdir, 'end_wrapper.sh')
2749  f = open(work_end_wrapper, 'w')
2750  f.write('#! /bin/bash\n')
2751  for end_script in stage.end_script:
2752  f.write('echo\n')
2753  f.write('echo "Executing %s"\n' % os.path.basename(end_script))
2754  f.write('./%s\n' % os.path.basename(end_script))
2755  f.write('status=$?\n')
2756  f.write('echo "%s finished with status $status"\n' % os.path.basename(end_script))
2757  f.write('if [ $status -ne 0 ]; then\n')
2758  f.write(' exit $status\n')
2759  f.write('fi\n')
2760  f.write('echo\n')
2761  f.write('echo "Done executing finalization scripts."\n')
2762  f.close()
2763  stage.end_script = work_end_wrapper
2764 
2765  # Copy worker midstage source initialization scripts to work directory.
2766 
2767  for istage in stage.mid_source:
2768  for mid_source in stage.mid_source[istage]:
2769  if mid_source != '':
2770  if not larbatch_posix.exists(mid_source):
2771  raise RuntimeError('Worker midstage initialization source script %s does not exist.\n' % mid_source)
2772  work_mid_source = os.path.join(tmpworkdir, os.path.basename(mid_source))
2773  if mid_source != work_mid_source:
2774  larbatch_posix.copy(mid_source, work_mid_source)
2775 
2776  # Generate midstage source initialization wrapper script mid_source_wrapper.sh
2777  # and update stage.mid_script to point to wrapper.
2778  # Note that variable $stage should be defined external to this script.
2779 
2780  if len(stage.mid_source) > 0:
2781  work_mid_source_wrapper = os.path.join(tmpworkdir, 'mid_source_wrapper.sh')
2782  f = open(work_mid_source_wrapper, 'w')
2783  for istage in stage.mid_source:
2784  for mid_source in stage.mid_source[istage]:
2785  f.write('if [ $stage -eq %d ]; then\n' % istage)
2786  f.write(' echo\n')
2787  f.write(' echo "Sourcing %s"\n' % os.path.basename(mid_source))
2788  f.write(' source %s\n' % os.path.basename(mid_source))
2789  f.write('fi\n')
2790  f.write('echo\n')
2791  f.write('echo "Done sourcing midstage source initialization scripts for stage $stage."\n')
2792  f.close()
2793  stage.mid_source = work_mid_source_wrapper
2794  else:
2795  stage.mid_source = ''
2796 
2797  # Copy worker midstage finalization scripts to work directory.
2798 
2799  for istage in stage.mid_script:
2800  for mid_script in stage.mid_script[istage]:
2801  if mid_script != '':
2802  if not larbatch_posix.exists(mid_script):
2803  raise RuntimeError('Worker midstage finalization script %s does not exist.\n' % mid_script)
2804  work_mid_script = os.path.join(tmpworkdir, os.path.basename(mid_script))
2805  if mid_script != work_mid_script:
2806  larbatch_posix.copy(mid_script, work_mid_script)
2807 
2808  # Generate midstage finalization wrapper script mid_wrapper.sh and update stage.mid_script
2809  # to point to wrapper.
2810 
2811  if len(stage.mid_script) > 0:
2812  work_mid_wrapper = os.path.join(tmpworkdir, 'mid_wrapper.sh')
2813  f = open(work_mid_wrapper, 'w')
2814  f.write('#! /bin/bash\n')
2815  f.write('stage=$1\n')
2816  for istage in stage.mid_script:
2817  for mid_script in stage.mid_script[istage]:
2818  f.write('if [ $stage -eq %d ]; then\n' % istage)
2819  f.write(' echo\n')
2820  f.write(' echo "Executing %s"\n' % os.path.basename(mid_script))
2821  f.write(' ./%s\n' % os.path.basename(mid_script))
2822  f.write(' status=$?\n')
2823  f.write(' echo "%s finished with status $status"\n' % os.path.basename(mid_script))
2824  f.write(' if [ $status -ne 0 ]; then\n')
2825  f.write(' exit $status\n')
2826  f.write(' fi\n')
2827  f.write('fi\n')
2828  f.write('echo\n')
2829  f.write('echo "Done executing midstage finalization scripts for stage $stage."\n')
2830  f.close()
2831  stage.mid_script = work_mid_wrapper
2832  else:
2833  stage.mid_script = ''
2834 
2835  # Copy helper scripts to work directory.
2836 
2837  helpers = ('root_metadata.py',
2838  'merge_json.py',
2839  'subruns.py',
2840  'validate_in_job.py',
2841  'mkdir.py',
2842  'emptydir.py',
2843  'file_to_url.sh')
2844 
2845  for helper in helpers:
2846 
2847  # Find helper script in execution path.
2848 
2849  jobinfo = subprocess.Popen(['which', helper],
2850  stdout=subprocess.PIPE,
2851  stderr=subprocess.PIPE)
2852  jobout, joberr = jobinfo.communicate()
2853  jobout = convert_str(jobout)
2854  joberr = convert_str(joberr)
2855  rc = jobinfo.poll()
2856  helper_path = jobout.splitlines()[0].strip()
2857  if rc == 0:
2858  work_helper = os.path.join(tmpworkdir, helper)
2859  if helper_path != work_helper:
2860  larbatch_posix.copy(helper_path, work_helper)
2861  else:
2862  print('Helper script %s not found.' % helper)
2863 
2864  # Copy helper python modules to work directory.
2865  # Note that for this to work, these modules must be single files.
2866 
2867  helper_modules = ('larbatch_posix',
2868  'project_utilities',
2869  'larbatch_utilities',
2870  'experiment_utilities',
2871  'extractor_dict')
2872 
2873  for helper_module in helper_modules:
2874 
2875  # Find helper module files.
2876 
2877  jobinfo = subprocess.Popen(['python'],
2878  stdin=subprocess.PIPE,
2879  stdout=subprocess.PIPE,
2880  stderr=subprocess.PIPE)
2881  cmd = 'import %s\nprint(%s.__file__)\n' % (helper_module, helper_module)
2882  jobinfo.stdin.write(convert_bytes(cmd))
2883  jobout, joberr = jobinfo.communicate()
2884  jobout = convert_str(jobout)
2885  joberr = convert_str(joberr)
2886  rc = jobinfo.poll()
2887  helper_path = jobout.splitlines()[-1].strip()
2888  if rc == 0:
2889  #print 'helper_path = %s' % helper_path
2890  work_helper = os.path.join(tmpworkdir, os.path.basename(helper_path))
2891  if helper_path != work_helper:
2892  larbatch_posix.copy(helper_path, work_helper)
2893  else:
2894  print('Helper python module %s not found.' % helper_module)
2895 
2896  # If this is a makeup action, find list of missing files.
2897  # If sam information is present (cpids.list), create a makeup dataset.
2898 
2899  if makeup:
2900 
2901  checked_file = os.path.join(stage.bookdir, 'checked')
2902  if not larbatch_posix.exists(checked_file):
2903  raise RuntimeError('Wait for any running jobs to finish and run project.py --check')
2904  makeup_count = 0
2905 
2906  # First delete bad worker subdirectories.
2907 
2908  bad_filename = os.path.join(stage.bookdir, 'bad.list')
2909  if larbatch_posix.exists(bad_filename):
2910  lines = larbatch_posix.readlines(bad_filename)
2911  for line in lines:
2912  bad_subdir = line.strip()
2913  if bad_subdir != '':
2914  bad_path = os.path.join(stage.outdir, bad_subdir)
2915  if larbatch_posix.exists(bad_path):
2916  print('Deleting %s' % bad_path)
2917  larbatch_posix.rmtree(bad_path)
2918  bad_path = os.path.join(stage.logdir, bad_subdir)
2919  if larbatch_posix.exists(bad_path):
2920  print('Deleting %s' % bad_path)
2921  larbatch_posix.rmtree(bad_path)
2922  bad_path = os.path.join(stage.bookdir, bad_subdir)
2923  if larbatch_posix.exists(bad_path):
2924  print('Deleting %s' % bad_path)
2925  larbatch_posix.rmtree(bad_path)
2926 
2927  # Get a list of missing files, if any, for file list input.
2928  # Regenerate the input file list in the work directory, and
2929  # set the makeup job count.
2930 
2931  missing_files = []
2932  if stage.inputdef == '':
2933  missing_filename = os.path.join(stage.bookdir, 'missing_files.list')
2934  if larbatch_posix.exists(missing_filename):
2935  lines = larbatch_posix.readlines(missing_filename)
2936  for line in lines:
2937  words = line.split()
2938  if len(words) > 0:
2939  missing_files.append(words[0])
2940  makeup_count = len(missing_files)
2941  print('Makeup list contains %d files.' % makeup_count)
2942 
2943  if input_list_name != '':
2944  work_list_name = os.path.join(tmpworkdir, input_list_name)
2945  if larbatch_posix.exists(work_list_name):
2946  larbatch_posix.remove(work_list_name)
2947  work_list = safeopen(work_list_name)
2948  for missing_file in missing_files:
2949  work_list.write('%s\n' % missing_file)
2950  work_list.close()
2951 
2952  # In case of making up generation jobs, produce a procmap file
2953  # for missing jobs that will ensure that made up generation
2954  # jobs get a unique subrun.
2955 
2956  if stage.inputdef == '' and stage.inputfile == '' and stage.inputlist == '':
2957  procs = set(range(stage.num_jobs))
2958 
2959  # Loop over good output files to extract existing
2960  # process numbers and determine missing process numbers.
2961 
2962  output_files = os.path.join(stage.bookdir, 'files.list')
2963  if larbatch_posix.exists(output_files):
2964  lines = larbatch_posix.readlines(output_files)
2965  for line in lines:
2966  dir = os.path.basename(os.path.dirname(line))
2967  dir_parts = dir.split('_')
2968  if len(dir_parts) > 1:
2969  proc = int(dir_parts[1])
2970  if proc in procs:
2971  procs.remove(proc)
2972  if len(procs) != makeup_count:
2973  raise RuntimeError('Makeup process list has different length than makeup count.')
2974 
2975  # Generate process map.
2976 
2977  if len(procs) > 0:
2978  procmap = 'procmap.txt'
2979  procmap_path = os.path.join(tmpworkdir, procmap)
2980  procmap_file = safeopen(procmap_path)
2981  for proc in procs:
2982  procmap_file.write('%d\n' % proc)
2983  procmap_file.close()
2984 
2985  # Prepare sam-related makeup information.
2986 
2987  import_samweb()
2988 
2989  # Get list of successful consumer process ids.
2990 
2991  cpids = []
2992  cpids_filename = os.path.join(stage.bookdir, 'cpids.list')
2993  if larbatch_posix.exists(cpids_filename):
2994  cpids_files = larbatch_posix.readlines(cpids_filename)
2995  for line in cpids_files:
2996  cpids.append(line.strip())
2997 
2998  # Create makeup dataset definition.
2999 
3000  makeup_defname = ''
3001  if len(cpids) > 0:
3002  project_utilities.test_kca()
3003  makeup_defname = samweb.makeProjectName(stage.inputdef) + '_makeup'
3004 
3005  # Construct comma-separated list of consumer process ids.
3006 
3007  cpids_list = ''
3008  sep = ''
3009  for cpid in cpids:
3010  cpids_list = cpids_list + '%s%s' % (sep, cpid)
3011  sep = ','
3012 
3013  # Construct makeup dimension.
3014 
3015  dim = '(defname: %s) minus (consumer_process_id %s and consumed_status consumed)' % (stage.inputdef, cpids_list)
3016 
3017  # Create makeup dataset definition.
3018 
3019  print('Creating makeup sam dataset definition %s' % makeup_defname)
3020  project_utilities.test_kca()
3021  samweb.createDefinition(defname=makeup_defname, dims=dim)
3022  makeup_count = samweb.countFiles(defname=makeup_defname)
3023  print('Makeup dataset contains %d files.' % makeup_count)
3024 
3025  # Make a tarball out of all of the files in tmpworkdir in stage.workdir
3026 
3027  tmptar = '%s/work.tar' % tmpworkdir
3028  jobinfo = subprocess.Popen(['tar','-cf', tmptar, '-C', tmpworkdir,
3029  '--mtime=2018-01-01',
3030  '--exclude=work.tar', '.'],
3031  stdout=subprocess.PIPE,
3032  stderr=subprocess.PIPE)
3033  jobout, joberr = jobinfo.communicate()
3034  rc = jobinfo.poll()
3035  if rc != 0:
3036  raise RuntimeError('Failed to create work tarball in %s' % tmpworkdir)
3037 
3038  # Calculate the checksum of the tarball.
3039 
3040  hasher = hashlib.md5()
3041  f = open(tmptar, 'rb')
3042  buf = f.read(1024)
3043  while len(buf) > 0:
3044  hasher.update(buf)
3045  buf = f.read(1024)
3046  hash = hasher.hexdigest()
3047  f.close()
3048 
3049  # Transfer tarball to work directory.
3050  # Give the tarball a unique name based on its checksum.
3051  # Don't replace the tarball if it already exists.
3052 
3053  hashtar = '%s/work%s.tar' % (stage.workdir, hash)
3054  if not larbatch_posix.exists(hashtar):
3055  larbatch_posix.copy(tmptar, hashtar)
3056  jobsub_workdir_files_args.extend(['-f', hashtar])
3057 
3058  # Sam stuff.
3059 
3060  # Get input sam dataset definition name.
3061  # Can be from xml or a makeup dataset that we just created.
3062 
3063  inputdef = stage.inputdef
3064  if makeup and makeup_defname != '':
3065  inputdef = makeup_defname
3066 
3067  # Sam project name.
3068 
3069  prjname = ''
3070  if inputdef != '':
3071  import_samweb()
3072  project_utilities.test_kca()
3073  prjname = samweb.makeProjectName(inputdef)
3074 
3075  # Get mix input sam dataset definition name.
3076 
3077  mixprjname = ''
3078  if stage.mixinputdef != '':
3079  import_samweb()
3080  project_utilities.test_kca()
3081  mixprjname = 'mix_%s' % samweb.makeProjectName(stage.mixinputdef)
3082 
3083  # If the prestart flag is specified, start the sam project now.
3084 
3085  prj_started = False
3086  if prjname != '' and stage.prestart != 0:
3087  ok = project_utilities.start_project(inputdef, prjname,
3088  stage.num_jobs * stage.max_files_per_job,
3089  stage.recur, stage.filelistdef)
3090  if ok != 0:
3091  print('Failed to start project.')
3092  sys.exit(1)
3093  prj_started = True
3094 
3095  # Also start mix project, if any.
3096 
3097  if mixprjname != '' and prj_started:
3098  ok = project_utilities.start_project(stage.mixinputdef, mixprjname, 0, 0, stage.filelistdef)
3099  if ok != 0:
3100  print('Failed to start mix project.')
3101  sys.exit(1)
3102 
3103  # Get role
3104 
3105  role = project_utilities.get_role()
3106  if project.role != '':
3107  role = project.role
3108 
3109  # Construct jobsub command line for workers.
3110 
3111  command = ['jobsub_submit']
3112  command_njobs = 1
3113 
3114  # Jobsub options.
3115 
3116  command.append('--group=%s' % project_utilities.get_experiment())
3117  command.append('--role=%s' % role)
3118  command.extend(jobsub_workdir_files_args)
3119  if project.server != '-' and project.server != '':
3120  command.append('--jobsub-server=%s' % project.server)
3121  if stage.resource != '':
3122  command.append('--resource-provides=usage_model=%s' % stage.resource)
3123  elif project.resource != '':
3124  command.append('--resource-provides=usage_model=%s' % project.resource)
3125  if stage.lines != '':
3126  command.append('--lines=%s' % stage.lines)
3127  elif project.lines != '':
3128  command.append('--lines=%s' % project.lines)
3129  if stage.site != '':
3130  command.append('--site=%s' % stage.site)
3131  if stage.blacklist != '':
3132  command.append('--blacklist=%s' % stage.blacklist)
3133  if stage.cpu != 0:
3134  command.append('--cpu=%d' % stage.cpu)
3135  if stage.disk != '':
3136  command.append('--disk=%s' % stage.disk)
3137  if stage.memory != 0:
3138  command.append('--memory=%d' % stage.memory)
3139  if project.os != '':
3140  if stage.singularity == 0:
3141  command.append('--OS=%s' % project.os)
3142  else:
3143  p = project_utilities.get_singularity(project.os)
3144  if p != '':
3145  if (stage.num_jobs > 1 or project.force_dag) and \
3146  (inputdef != '' or stage.mixinputdef != '') :
3147  command.append(r"""--lines='+SingularityImage=\"%s\"'""" % p)
3148  else:
3149  command.append(r"""--lines='+SingularityImage="%s"'""" % p)
3150  else:
3151  raise RuntimeError('No singularity image found for %s' % project.os)
3152  if not stage.pubs_output:
3153  if not makeup:
3154  command_njobs = stage.num_jobs
3155  command.extend(['-N', '%d' % command_njobs])
3156  else:
3157  command_njobs = min(makeup_count, stage.num_jobs)
3158  command.extend(['-N', '%d' % command_njobs])
3159  else:
3160  if stage.inputdef != '':
3161  command_njobs = stage.num_jobs
3162  else:
3163  command_njobs = stage.num_jobs
3164  command.extend(['-N', '%d' % command_njobs])
3165  if stage.jobsub != '':
3166  for word in stage.jobsub.split():
3167  command.append(word)
3168  opt = project_utilities.default_jobsub_submit_options()
3169  if opt != '':
3170  for word in opt.split():
3171  command.append(word)
3172  if stage.cvmfs != 0:
3173  command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3174  if stage.stash != 0:
3175  command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3176  if stage.singularity != 0:
3177  command.append('--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3178 
3179  # Batch script.
3180 
3181  workurl = "file://%s" % workscript
3182  command.append(workurl)
3183 
3184  # check if there is a request for max num of files per job
3185  # and add that if to the condor_lar.sh line
3186 
3187  if stage.max_files_per_job != 0:
3188  command_max_files_per_job = stage.max_files_per_job
3189  command.extend(['--nfile', '%d' % command_max_files_per_job])
3190  #print 'Setting the max files to %d' % command_max_files_per_job
3191 
3192  # Larsoft options.
3193 
3194  command.extend([' --group', project_utilities.get_experiment()])
3195  command.extend([' -g'])
3196  command.extend([' -c', 'wrapper.fcl'])
3197  command.extend([' --ups', ','.join(project.ups)])
3198  if project.release_tag != '':
3199  command.extend([' -r', project.release_tag])
3200  command.extend([' -b', project.release_qual])
3201  if project.local_release_dir != '':
3202  command.extend([' --localdir', project.local_release_dir])
3203  if project.local_release_tar != '':
3204  command.extend([' --localtar', project.local_release_tar])
3205  command.extend([' --workdir', stage.workdir])
3206  command.extend([' --outdir', stage.outdir])
3207  command.extend([' --logdir', stage.logdir])
3208  if stage.dirsize > 0:
3209  command.extend([' --dirsize', '%d' % stage.dirsize])
3210  if stage.dirlevels > 0:
3211  command.extend([' --dirlevels', '%d' % stage.dirlevels])
3212  if stage.exe:
3213  if type(stage.exe) == type([]):
3214  command.extend([' --exe', ':'.join(stage.exe)])
3215  else:
3216  command.extend([' --exe', stage.exe])
3217  if stage.schema != '':
3218  command.extend([' --sam_schema', stage.schema])
3219  if project.os != '':
3220  command.extend([' --os', project.os])
3221 
3222  # Set the process number for pubs jobs that are the first in the chain.
3223 
3224  if not stage.pubs_input and stage.pubs_output and stage.output_subruns[0] > 0:
3225  command.extend(['--process', '%d' % (stage.output_subruns[0]-1)])
3226 
3227  # Specify single worker mode in case of pubs output.
3228 
3229  if stage.dynamic:
3230  command.append('--single')
3231 
3232  if stage.inputfile != '':
3233  command.extend([' -s', stage.inputfile])
3234  elif input_list_name != '':
3235  command.extend([' -S', input_list_name])
3236  elif inputdef != '':
3237  command.extend([' --sam_defname', inputdef,
3238  ' --sam_project', prjname])
3239  if recur:
3240  command.extend([' --recur'])
3241  if stage.mixinputdef != '':
3242  command.extend([' --mix_defname', stage.mixinputdef,
3243  ' --mix_project', mixprjname])
3244  if stage.inputmode != '':
3245  command.extend([' --inputmode', stage.inputmode])
3246  command.extend([' -n', '%d' % stage.num_events])
3247  if stage.inputdef == '':
3248  command.extend([' --njobs', '%d' % stage.num_jobs ])
3249  for ftype in stage.datafiletypes:
3250  command.extend(['--data_file_type', ftype])
3251  if procmap != '':
3252  command.extend([' --procmap', procmap])
3253  if stage.output:
3254  if type(stage.output) == type([]):
3255  command.extend([' --output', ':'.join(stage.output)])
3256  else:
3257  command.extend([' --output', stage.output])
3258  if stage.TFileName != '':
3259  command.extend([' --TFileName', stage.TFileName])
3260  if stage.init_script != '':
3261  command.extend([' --init-script', os.path.basename(stage.init_script)])
3262  if stage.init_source != '':
3263  command.extend([' --init-source', os.path.basename(stage.init_source)])
3264  if stage.end_script != '':
3265  command.extend([' --end-script', os.path.basename(stage.end_script)])
3266  if stage.mid_source != '':
3267  command.extend([' --mid-source', os.path.basename(stage.mid_source)])
3268  if stage.mid_script != '':
3269  command.extend([' --mid-script', os.path.basename(stage.mid_script)])
3270  if abssetupscript != '':
3271  command.extend([' --init', abssetupscript])
3272 
3273 
3274  #print 'Will Validation will be done on the worker node %d' % stage.validate_on_worker
3275  if stage.validate_on_worker == 1:
3276  print('Validation will be done on the worker node %d' % stage.validate_on_worker)
3277  command.extend([' --validate'])
3278  command.extend([' --declare'])
3279  # Maintain parentage only if we have multiple fcl files and thus are running in multiple stages
3280  if type(stage.fclname) == type([]) and len(stage.fclname) > 1:
3281  command.extend([' --maintain_parentage'])
3282 
3283  if stage.copy_to_fts == 1:
3284  command.extend([' --copy'])
3285 
3286  # If input is from sam, also construct a dag file, or add --sam_start option.
3287 
3288  if (prjname != '' or mixprjname != '') and command_njobs == 1 and not project.force_dag and not prj_started:
3289  command.extend([' --sam_start',
3290  ' --sam_station', project_utilities.get_experiment(),
3291  ' --sam_group', project_utilities.get_experiment()])
3292 
3293 
3294  # At this point, the main batch worker command is complete.
3295  # Decide whether to submit this command stand alone or as part of a dag.
3296 
3297  start_commands = []
3298  stop_commands = []
3299  dag_prjs = []
3300  if command_njobs > 1 or project.force_dag:
3301  if inputdef != '':
3302  dag_prjs.append([inputdef, prjname])
3303  if stage.mixinputdef != '':
3304  dag_prjs.append([stage.mixinputdef, mixprjname])
3305 
3306  for dag_prj in dag_prjs:
3307 
3308  # At this point, it is an error if the start and stop project
3309  # scripts were not found.
3310 
3311  if workstartname == '' or workstopname == '':
3312  raise RuntimeError('Sam start or stop project script not found.')
3313 
3314  # Start project jobsub command.
3315 
3316  start_command = ['jobsub']
3317 
3318  # General options.
3319 
3320  start_command.append('--group=%s' % project_utilities.get_experiment())
3321  if setupscript != '':
3322  start_command.append('-f %s' % setupscript)
3323  #start_command.append('--role=%s' % role)
3324  if stage.resource != '':
3325  start_command.append('--resource-provides=usage_model=%s' % stage.resource)
3326  elif project.resource != '':
3327  start_command.append('--resource-provides=usage_model=%s' % project.resource)
3328  if stage.lines != '':
3329  start_command.append('--lines=%s' % stage.lines)
3330  elif project.lines != '':
3331  start_command.append('--lines=%s' % project.lines)
3332  if stage.site != '':
3333  start_command.append('--site=%s' % stage.site)
3334  if stage.blacklist != '':
3335  start_command.append('--blacklist=%s' % stage.blacklist)
3336  if project.os != '':
3337  if stage.singularity == 0:
3338  start_command.append('--OS=%s' % project.os)
3339  else:
3340  p = project_utilities.get_singularity(project.os)
3341  if p != '':
3342  start_command.append('--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3343  else:
3344  raise RuntimeError('No singularity image found for %s' % project.os)
3345  if stage.jobsub_start != '':
3346  for word in stage.jobsub_start.split():
3347  start_command.append(word)
3348  opt = project_utilities.default_jobsub_submit_options()
3349  if opt != '':
3350  for word in opt.split():
3351  start_command.append(word)
3352  if stage.cvmfs != 0:
3353  start_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3354  if stage.stash != 0:
3355  start_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3356  if stage.singularity != 0:
3357  start_command.append('--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3358 
3359  # Start project script.
3360 
3361  workstarturl = "file://%s" % workstartscript
3362  start_command.append(workstarturl)
3363 
3364  # Sam options.
3365 
3366  start_command.extend([' --sam_station', project_utilities.get_experiment(),
3367  ' --sam_group', project_utilities.get_experiment(),
3368  ' --sam_defname', dag_prj[0],
3369  ' --sam_project', dag_prj[1],
3370  ' -g'])
3371  if recur:
3372  start_command.extend([' --recur'])
3373 
3374  if abssetupscript != '':
3375  start_command.extend([' --init', abssetupscript])
3376 
3377  if stage.num_jobs > 0 and stage.max_files_per_job > 0:
3378  start_command.extend([' --max_files', '%d' % (stage.num_jobs * stage.max_files_per_job)])
3379 
3380  if stage.prestagefraction > 0.:
3381  start_command.extend([' --prestage_fraction', '%f' % stage.prestagefraction])
3382 
3383  # Output directory.
3384 
3385  start_command.extend([' --logdir', stage.logdir])
3386 
3387  # Done with start command.
3388 
3389  if not prj_started or stage.prestagefraction > 0.:
3390  start_commands.append(start_command)
3391 
3392  # Stop project jobsub command.
3393 
3394  stop_command = ['jobsub']
3395 
3396  # General options.
3397 
3398  stop_command.append('--group=%s' % project_utilities.get_experiment())
3399  if setupscript != '':
3400  stop_command.append('-f %s' % setupscript)
3401  #stop_command.append('--role=%s' % role)
3402  if stage.resource != '':
3403  stop_command.append('--resource-provides=usage_model=%s' % stage.resource)
3404  elif project.resource != '':
3405  stop_command.append('--resource-provides=usage_model=%s' % project.resource)
3406  if stage.lines != '':
3407  stop_command.append('--lines=%s' % stage.lines)
3408  elif project.lines != '':
3409  stop_command.append('--lines=%s' % project.lines)
3410  if stage.site != '':
3411  stop_command.append('--site=%s' % stage.site)
3412  if stage.blacklist != '':
3413  stop_command.append('--blacklist=%s' % stage.blacklist)
3414  if project.os != '':
3415  if stage.singularity == 0:
3416  stop_command.append('--OS=%s' % project.os)
3417  else:
3418  p = project_utilities.get_singularity(project.os)
3419  if p != '':
3420  stop_command.append('--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3421  else:
3422  raise RuntimeError('No singularity image found for %s' % project.os)
3423  if stage.jobsub_start != '':
3424  for word in stage.jobsub_start.split():
3425  stop_command.append(word)
3426  opt = project_utilities.default_jobsub_submit_options()
3427  if opt != '':
3428  for word in opt.split():
3429  stop_command.append(word)
3430  if stage.cvmfs != 0:
3431  stop_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3432  if stage.stash != 0:
3433  stop_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3434  if stage.singularity != 0:
3435  stop_command.append('--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3436 
3437  # Stop project script.
3438 
3439  workstopurl = "file://%s" % workstopscript
3440  stop_command.append(workstopurl)
3441 
3442  # Sam options.
3443 
3444  stop_command.extend([' --sam_station', project_utilities.get_experiment(),
3445  ' --sam_project', dag_prj[1],
3446  ' -g'])
3447 
3448  # Output directory.
3449 
3450  stop_command.extend([' --logdir', stage.logdir])
3451 
3452  if abssetupscript != '':
3453  stop_command.extend([' --init', abssetupscript])
3454 
3455  # Done with stop command.
3456 
3457  stop_commands.append(stop_command)
3458 
3459  if len(start_commands) > 0 or len(stop_commands) > 0:
3460 
3461  # Create dagNabbit.py configuration script in the work directory.
3462 
3463  dagfilepath = os.path.join(tmpdir, 'submit.dag')
3464  dag = safeopen(dagfilepath)
3465  dag.write('<serial>\n')
3466 
3467  # Write start section.
3468 
3469  if len(start_commands) > 0:
3470  dag.write('\n<parallel>\n\n')
3471  for start_command in start_commands:
3472  first = True
3473  for word in start_command:
3474  if not first:
3475  dag.write(' ')
3476  dag.write(word)
3477  if word[:6] == 'jobsub':
3478  dag.write(' -n')
3479  first = False
3480  dag.write('\n\n')
3481  dag.write('</parallel>\n')
3482 
3483  # Write main section.
3484 
3485  dag.write('\n<parallel>\n\n')
3486  for process in range(command_njobs):
3487  #for process in range(1):
3488  first = True
3489  skip = False
3490  for word in command:
3491  if skip:
3492  skip = False
3493  else:
3494  if word == '-N':
3495  #if False:
3496  skip = True
3497  else:
3498  if not first:
3499  dag.write(' ')
3500  if word[:6] == 'jobsub':
3501  word = 'jobsub'
3502  if word[:7] == '--role=':
3503  word = ''
3504  if word.startswith('--jobsub-server='):
3505  word = ''
3506  word = project_utilities.dollar_escape(word)
3507  dag.write(word)
3508  if word[:6] == 'jobsub':
3509  dag.write(' -n')
3510  first = False
3511  dag.write(' --process %d\n' % process)
3512  dag.write('\n')
3513  dag.write('\n</parallel>\n')
3514 
3515  # Write stop section.
3516 
3517  if len(stop_commands) > 0:
3518  dag.write('\n<parallel>\n\n')
3519  for stop_command in stop_commands:
3520  first = True
3521  for word in stop_command:
3522  if not first:
3523  dag.write(' ')
3524  dag.write(word)
3525  if word[:6] == 'jobsub':
3526  dag.write(' -n')
3527  first = False
3528  dag.write('\n\n')
3529  dag.write('</parallel>\n')
3530 
3531  # Finish dag.
3532 
3533  dag.write('\n</serial>\n')
3534  dag.close()
3535 
3536  # Update the main submission command to use jobsub_submit_dag instead of jobsub_submit.
3537 
3538  command = ['jobsub_submit_dag']
3539  command.append('--group=%s' % project_utilities.get_experiment())
3540  if project.server != '-' and project.server != '':
3541  command.append('--jobsub-server=%s' % project.server)
3542  command.append('--role=%s' % role)
3543  dagfileurl = 'file://'+ dagfilepath
3544  command.append(dagfileurl)
3545 
3546  checked_file = os.path.join(stage.bookdir, 'checked')
3547 
3548  # Calculate submit timeout.
3549 
3550  submit_timeout = 3600000
3551  if prjname != '':
3552  submit_timeout += 1.0 * command_njobs
3553  if stage.jobsub_timeout > submit_timeout:
3554  submit_timeout = stage.jobsub_timeout
3555 
3556  # Submit jobs.
3557 
3558  if not makeup:
3559 
3560  # For submit action, invoke the job submission command.
3561 
3562  print('Invoke jobsub_submit')
3563  if dryrun:
3564  print(' '.join(command))
3565  else:
3566  q = queue.Queue()
3567  jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3568  thread = threading.Thread(target=project_utilities.wait_for_subprocess, args=[jobinfo, q])
3569  thread.start()
3570  thread.join(timeout=submit_timeout)
3571  if thread.is_alive():
3572  jobinfo.terminate()
3573  thread.join()
3574  rc = q.get()
3575  jobout = convert_str(q.get())
3576  joberr = convert_str(q.get())
3577  if larbatch_posix.exists(checked_file):
3578  larbatch_posix.remove(checked_file)
3579  if larbatch_posix.isdir(tmpdir):
3580  larbatch_posix.rmtree(tmpdir)
3581  if larbatch_posix.isdir(tmpworkdir):
3582  larbatch_posix.rmtree(tmpworkdir)
3583  if rc != 0:
3584  raise JobsubError(command, rc, jobout, joberr)
3585  for line in jobout.split('\n'):
3586  if "JobsubJobId" in line:
3587  jobid = line.strip().split()[-1]
3588  if not jobid:
3589  raise JobsubError(command, rc, jobout, joberr)
3590  print('jobsub_submit finished.')
3591 
3592  else:
3593 
3594  # For makeup action, abort if makeup job count is zero for some reason.
3595 
3596  if makeup_count > 0:
3597  if dryrun:
3598  print(' '.join(command))
3599  else:
3600  q = queue.Queue()
3601  jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3602  thread = threading.Thread(target=project_utilities.wait_for_subprocess,
3603  args=[jobinfo, q])
3604  thread.start()
3605  thread.join(timeout=submit_timeout)
3606  if thread.is_alive():
3607  jobinfo.terminate()
3608  thread.join()
3609  rc = q.get()
3610  jobout = convert_str(q.get())
3611  joberr = convert_str(q.get())
3612  if larbatch_posix.exists(checked_file):
3613  larbatch_posix.remove(checked_file)
3614  if larbatch_posix.isdir(tmpdir):
3615  larbatch_posix.rmtree(tmpdir)
3616  if larbatch_posix.isdir(tmpworkdir):
3617  larbatch_posix.rmtree(tmpworkdir)
3618  if rc != 0:
3619  raise JobsubError(command, rc, jobout, joberr)
3620  for line in jobout.split('\n'):
3621  if "JobsubJobId" in line:
3622  jobid = line.strip().split()[-1]
3623  if not jobid:
3624  raise JobsubError(command, rc, jobout, joberr)
3625  else:
3626  print('Makeup action aborted because makeup job count is zero.')
3627 
3628  # Done.
3629 
3630  return jobid
3631 
3632 
3633 # Submit/makeup action.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def safeopen
Definition: project.py:4739
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep.
def import_samweb
Definition: project.py:504
def dojobsub
Definition: project.py:2482
open(RACETRACK) or die("Could not open file $RACETRACK for writing")
def project.domerge (   stage,
  mergehist,
  mergentuple 
)

Definition at line 3721 of file project.py.

3722 def domerge(stage, mergehist, mergentuple):
3723 
3724  hlist = []
3725  hnlist = os.path.join(stage.bookdir, 'filesana.list')
3726  if larbatch_posix.exists(hnlist):
3727  hlist = larbatch_posix.readlines(hnlist)
3728  else:
3729  raise RuntimeError('No filesana.list file found %s, run project.py --checkana' % hnlist)
3730 
3731  histurlsname_temp = 'histurls.list'
3732  histurls = safeopen(histurlsname_temp)
3733 
3734  for hist in hlist:
3735  histurls.write('%s\n' % hist)
3736  histurls.close()
3737 
3738  if len(hlist) > 0:
3739  name = os.path.join(stage.outdir, 'anahist.root')
3740  if name[0:6] == '/pnfs/':
3741  tempdir = '%s/mergentuple_%d_%d' % (project_utilities.get_scratch_dir(),
3742  os.getuid(),
3743  os.getpid())
3744  if not larbatch_posix.isdir(tempdir):
3745  larbatch_posix.makedirs(tempdir)
3746  name_temp = '%s/anahist.root' % tempdir
3747  else:
3748  name_temp = name
3749 
3750  if mergehist:
3751  mergecom = "hadd -T"
3752  elif mergentuple:
3753  mergecom = "hadd"
3754  else:
3755  mergecom = stage.merge
3756 
3757  print("Merging %d root files using %s." % (len(hlist), mergecom))
3758 
3759  if larbatch_posix.exists(name_temp):
3760  larbatch_posix.remove(name_temp)
3761  comlist = mergecom.split()
3762  comlist.extend(["-f", "-k", name_temp, '@' + histurlsname_temp])
3763  rc = subprocess.call(comlist, stdout=sys.stdout, stderr=sys.stderr)
3764  if rc != 0:
3765  print("%s exit status %d" % (mergecom, rc))
3766  if name != name_temp:
3767  if larbatch_posix.exists(name):
3768  larbatch_posix.remove(name)
3769  if larbatch_posix.exists(name_temp):
3770 
3771  # Copy merged file.
3772  larbatch_posix.copy(name_temp, name)
3773  larbatch_posix.rmtree(tempdir)
3774  larbatch_posix.remove(histurlsname_temp)
3775 
3776 
3777 # Sam audit.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def safeopen
Definition: project.py:4739
def domerge
Definition: project.py:3721
def project.doquickcheck (   project,
  stage,
  ana 
)

Definition at line 1578 of file project.py.

1579 def doquickcheck(project, stage, ana):
1580 
1581  # Check that output and log directories exist. Dirs could be lost due to ifdhcp failures
1582  if not larbatch_posix.isdir(stage.outdir):
1583  print('Output directory %s does not exist.' % stage.outdir)
1584  return 1
1585 
1586  if not larbatch_posix.isdir(stage.bookdir):
1587  print('Log directory %s does not exist.' % stage.bookdir)
1588  return 1
1589 
1590  print('Checking directory %s' % stage.bookdir)
1591 
1592  #Aggregate the .list files form the bookdir up one dir. This is where the old docheck would put them, and it double-checks that the files made it back from the worker node.
1593 
1594  goodFiles = [] # list of art root files
1595  goodAnaFiles = [] # list of analysis root files
1596  eventLists = [] # list of art root files and number of events
1597  badLists = [] # list of bad root files
1598  anaFiles = [] # list of ana files
1599  transferredFiles = [] # list of transferred files
1600  streamLists = {} # dictionary which keeps track of files per stream
1601 
1602  sam_projects = [] # list of sam projects
1603  cpids = [] # list of consumer process ids
1604 
1605  goodLogDirs = set() # Set of log directories.
1606  nErrors = 0 # Number of erors uncovered
1607 
1608  for log_subpath, subdirs, files in larbatch_posix.walk(stage.bookdir):
1609 
1610  # Only examine files in leaf directories.
1611 
1612  if len(subdirs) != 0:
1613  continue
1614 
1615  #skip start and stop project jobs for now
1616  if log_subpath[-6:] == '_start' or log_subpath[-5:] == '_stop':
1617  filename = os.path.join(log_subpath, 'sam_project.txt')
1618  if larbatch_posix.exists(filename):
1619  sam_project = larbatch_posix.readlines(filename)[0].strip()
1620  if sam_project != '' and not sam_project in sam_projects:
1621  sam_projects.append(sam_project)
1622  continue
1623 
1624 
1625  print('Doing quick check of directory %s.' % log_subpath)
1626 
1627  subdir = os.path.relpath(log_subpath, stage.bookdir)
1628 
1629  out_subpath = os.path.join(stage.outdir, subdir)
1630  dirok = project_utilities.fast_isdir(log_subpath)
1631 
1632  #first check the missing_file.list
1633 
1634 
1635  validateOK = 1
1636 
1637  missingfilesname = os.path.join(log_subpath, 'missing_files.list')
1638 
1639  #print missingfilesname
1640 
1641  try:
1642  #print 'Reading %s' % missingfilesname
1643  missingfiles = project_utilities.saferead(missingfilesname)
1644  #if we can't find missing_files the check will not work
1645  except:
1646  print('Cannot open file: %s' % missingfilesname)
1647  validateOK = 0
1648 
1649 
1650  if validateOK == 1 and len(missingfiles) == 0:
1651  print('%s exists, but is empty' % missingfilesname)
1652  validateOK = 0
1653 
1654 
1655  if validateOK == 1:
1656  line = missingfiles[0]
1657  line = line.strip('\n')
1658  if( int(line) != 0 ):
1659  validateOK = 0
1660 
1661 
1662  #If the validation failed, continue.
1663  if validateOK != 1:
1664  nErrors += 1
1665  continue
1666 
1667  #Copy files.
1668  #print 'Appending Files'
1669 
1670  # Check existence of sam_project.txt and cpid.txt.
1671  # Update sam_projects and cpids.
1672 
1673  if stage.inputdef != '':
1674 
1675  filename1 = os.path.join(log_subpath, 'sam_project.txt')
1676  if not larbatch_posix.exists(filename1):
1677  print('Could not find file sam_project.txt')
1678  nErrors += 1
1679  else:
1680  sam_project = larbatch_posix.readlines(filename1)[0].strip()
1681  if not sam_project in sam_projects:
1682  sam_projects.append(sam_project)
1683 
1684  filename2 = os.path.join(log_subpath, 'cpid.txt')
1685  if not larbatch_posix.exists(filename2):
1686  print('Could not find file cpid.txt')
1687  nErrors += 1
1688  else:
1689  cpid = larbatch_posix.readlines(filename2)[0].strip()
1690  if not cpid in cpids:
1691  cpids.append(cpid)
1692 
1693  filelistsrc = os.path.join(log_subpath, 'files.list')
1694  tmpArray = scan_file(filelistsrc)
1695 
1696  if( tmpArray == [ -1 ] ):
1697  nErrors += 1
1698  else:
1699  goodFiles.extend(tmpArray)
1700 
1701  fileanalistsrc = os.path.join(log_subpath, 'filesana.list')
1702  tmpArray = scan_file(fileanalistsrc)
1703 
1704  if( not tmpArray == [ -1 ] ):
1705  goodAnaFiles.extend(tmpArray)
1706 
1707  eventlistsrc = os.path.join(log_subpath, 'events.list')
1708 
1709  tmpArray = scan_file(eventlistsrc)
1710 
1711  if( tmpArray == [ -1 ] ):
1712  nErrors += 1
1713  else:
1714  eventLists.extend(tmpArray)
1715 
1716 
1717  badfilesrc = os.path.join(log_subpath, 'bad.list')
1718 
1719 
1720  tmpArray = scan_file(badfilesrc)
1721 
1722  #bad list begin empty is okay
1723  if( tmpArray == [ -1 ] ):
1724  pass
1725  else:
1726  badLists.extend(tmpArray)
1727 
1728  '''
1729  missingfilesrc = os.path.join(log_subpath, 'missing_files.list')
1730 
1731  tmpArray = scan_file(missingfilesrc)
1732 
1733  if( tmpArray == [ -1 ] ):
1734  nErrors += 1
1735  else:
1736  missingLists.extend(tmpArray)
1737  '''
1738 
1739  #if ana:
1740  # filesanalistsrc = os.path.join(log_subpath, 'filesana.list')
1741 
1742  # tmpArray = scan_file(filesanalistsrc)
1743 
1744  # if( tmpArray == [ -1 ] ):
1745  # nErrors += 1
1746  # else:
1747  # anaFiles.extend(tmpArray)
1748 
1749  urislistsrc = os.path.join(log_subpath, 'transferred_uris.list')
1750 
1751  tmpArray = scan_file(urislistsrc)
1752 
1753  #empty uri file is not nessecary an error
1754  if( tmpArray == [ -1 ] ):
1755  pass
1756  else:
1757  transferredFiles.extend(tmpArray)
1758  #create a list of files_*.list files. These are outputs from specific streams
1759  streamList = larbatch_posix.listdir(log_subpath)
1760 
1761  for stream in streamList:
1762  if( stream[:6] != "files_" ):
1763  continue
1764  streamfilesrc = os.path.join(log_subpath, stream)
1765  #print stream
1766  tmpArray = scan_file(streamfilesrc)
1767  if( tmpArray == [ -1 ] ):
1768  nErrors += 1
1769  else:
1770  if(streamLists.get(stream, "empty") == "empty" ):
1771  streamLists[stream] = tmpArray
1772  else:
1773  streamLists[stream].extend(tmpArray)
1774 
1775  if validateOK == 1:
1776  goodLogDirs.add(log_subpath)
1777 
1778  checkfilename = os.path.join(stage.bookdir, 'checked')
1779  checkfile = safeopen(checkfilename)
1780  checkfile.write('\n')
1781  checkfile.close()
1782 
1783  #create the input files.list for the next stage
1784  filelistdest = os.path.join(stage.bookdir, 'files.list')
1785  if larbatch_posix.exists(filelistdest):
1786  #print 'Deleting %s' % filelistdest
1787  larbatch_posix.remove(filelistdest)
1788  if len(goodLogDirs) == 1:
1789  src = '%s/files.list' % goodLogDirs.copy().pop()
1790  #print 'Symlinking %s to %s' % (src, filelistdest)
1791  larbatch_posix.symlink(src, filelistdest)
1792  else:
1793  #print 'Aggregating files.list'
1794  inputList = safeopen(filelistdest)
1795  for goodFile in goodFiles:
1796  #print goodFile
1797  inputList.write("%s\n" % goodFile)
1798  inputList.close()
1799  if len(goodFiles) == 0:
1800  project_utilities.addLayerTwo(filelistdest)
1801 
1802  #create the aggregated filesana.list
1803  fileanalistdest = os.path.join(stage.bookdir, 'filesana.list')
1804  if larbatch_posix.exists(fileanalistdest):
1805  #print 'Deleting %s' % fileanalistdest
1806  larbatch_posix.remove(fileanalistdest)
1807  if len(goodLogDirs) == 1:
1808  src = '%s/filesana.list' % goodLogDirs.copy().pop()
1809  #print 'Symlinking %s to %s' % (src, fileanalistdest)
1810  larbatch_posix.symlink(src, fileanalistdest)
1811  else:
1812  #print 'Aggregating filesana.list'
1813  anaList = safeopen(fileanalistdest)
1814  for goodAnaFile in goodAnaFiles:
1815  #print goodAnaFile
1816  anaList.write("%s\n" % goodAnaFile)
1817  anaList.close()
1818  if len(goodAnaFiles) == 0:
1819  project_utilities.addLayerTwo(fileanalistdest)
1820 
1821  #create the events.list for the next step
1822  eventlistdest = os.path.join(stage.bookdir, 'events.list')
1823  if larbatch_posix.exists(eventlistdest):
1824  #print 'Deleting %s' % eventlistdest
1825  larbatch_posix.remove(eventlistdest)
1826  if len(goodLogDirs) == 1:
1827  src = '%s/events.list' % goodLogDirs.copy().pop()
1828  #print 'Symlinking %s to %s' % (src, eventlistdest)
1829  larbatch_posix.symlink(src, eventlistdest)
1830  else:
1831  #print 'Aggregating events.list'
1832  eventsOutList = safeopen(eventlistdest)
1833  for event in eventLists:
1834  #print event
1835  eventsOutList.write("%s\n" % event)
1836  eventsOutList.close()
1837  if len(eventLists) == 0:
1838  project_utilities.addLayerTwo(eventlistdest)
1839 
1840  #create the bad.list for makeup jobs
1841  if(len(badLists) > 0):
1842  badlistdest = os.path.join(stage.bookdir, 'bad.list')
1843  badOutList = safeopen(badlistdest)
1844  for bad in badLists:
1845  badOutList.write("%s\n" % bad)
1846  badOutList.close()
1847  #project_utilities.addLayerTwo(badlistdest)
1848 
1849  #create the missing_files.list for makeup jobs
1850  missing_files = []
1851  if stage.inputdef == '' and not stage.pubs_output:
1852  input_files = get_input_files(stage)
1853  if len(input_files) > 0:
1854  missing_files = list(set(input_files) - set(transferredFiles))
1855 
1856  if len(missing_files) > 0:
1857  missinglistdest = os.path.join(stage.bookdir, 'missing_files.list')
1858  missingOutList = safeopen(missinglistdest)
1859  for missing in missing_files:
1860  missingOutList.write("%s\n" % missing)
1861  missingOutList.close()
1862  #project_utilities.addLayerTwo(missingOutList)
1863 
1864  #create the transferred_uris for the next step
1865  urilistdest = os.path.join(stage.bookdir, 'transferred_uris.list')
1866  if larbatch_posix.exists(urilistdest):
1867  #print 'Deleting %s' % urilistdest
1868  larbatch_posix.remove(urilistdest)
1869  if len(goodLogDirs) == 1 and len(transferredFiles) > 0:
1870  src = '%s/transferred_uris.list' % goodLogDirs.copy().pop()
1871  #print 'Symlinking %s to %s' % (src, urilistdest)
1872  larbatch_posix.symlink(src, urilistdest)
1873  else:
1874  #print 'Aggregating transferred_uris.list'
1875  uriOutList = safeopen(urilistdest)
1876  for uri in transferredFiles:
1877  #print event
1878  uriOutList.write("%s\n" % uri)
1879  uriOutList.close()
1880  if len(transferredFiles) == 0:
1881  project_utilities.addLayerTwo(urilistdest)
1882 
1883  if stage.inputdef != '':
1884  samprojectdest = os.path.join(stage.bookdir, 'sam_projects.list')
1885  if larbatch_posix.exists(samprojectdest):
1886  #print 'Deleting %s' % samprojectdest
1887  larbatch_posix.remove(samprojectdest)
1888  if len(goodLogDirs) == 1:
1889  src = '%s/sam_project.txt' % goodLogDirs.copy().pop()
1890  #print 'Symlinking %s to %s' % (src, samprojectdest)
1891  larbatch_posix.symlink(src, samprojectdest)
1892  else:
1893  #print 'Aggregating sam_projects.list'
1894  samprojectfile = safeopen(samprojectdest)
1895  for sam in sam_projects:
1896  samprojectfile.write("%s\n" % sam)
1897  samprojectfile.close()
1898  if len(sam_projects) == 0:
1899  project_utilities.addLayerTwo(samprojectdest)
1900 
1901  cpiddest = os.path.join(stage.bookdir, 'cpids.list')
1902  if larbatch_posix.exists(cpiddest):
1903  #print 'Deleting %s' % cpiddest
1904  larbatch_posix.remove(cpiddest)
1905  if len(goodLogDirs) == 1:
1906  src = '%s/cpid.txt' % goodLogDirs.copy().pop()
1907  #print 'Symlinking %s to %s' % (src, cpiddest)
1908  larbatch_posix.symlink(src, cpiddest)
1909  else:
1910  #print 'Aggregating cpids.list'
1911  cpidfile = safeopen(cpiddest)
1912  for cp in cpids:
1913  cpidfile.write("%s \n" % cp)
1914  cpidfile.close()
1915  if len(cpids) == 0:
1916  project_utilities.addLayerTwo(cpiddest)
1917 
1918 
1919  for stream in streamLists:
1920  streamdest = os.path.join(stage.bookdir, stream)
1921  if larbatch_posix.exists(streamdest):
1922  #print 'Deleting %s' % streamdest
1923  larbatch_posix.remove(streamdest)
1924  if len(goodLogDirs) == 1:
1925  src = '%s/%s' % (goodLogDirs.copy().pop(), stream)
1926  #print 'Symlinking %s to %s' % (src, streamdest)
1927  larbatch_posix.symlink(src, streamdest)
1928  else:
1929  #print 'Aggregating %s' % stream
1930  streamOutList = safeopen(streamdest)
1931  for line in streamLists[stream]:
1932  streamOutList.write("%s\n" % line)
1933  streamOutList.close()
1934  if len(streamLists[stream]) == 0:
1935  project_utilities.addLayerTwo(streamdest)
1936 
1937 
1938 
1939 
1940 
1941  print('Number of errors = %d' % nErrors)
1942 
1943  return nErrors
1944 
1945 # Check project results in the specified directory.
then if[["$THISISATEST"==1]]
Definition: neoSmazza.sh:95
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def safeopen
Definition: project.py:4739
def doquickcheck
Definition: project.py:1578
def get_input_files
Definition: project.py:941
def scan_file
Definition: project.py:4748
list
Definition: file_to_url.sh:28
def project.doshorten (   stage)

Definition at line 970 of file project.py.

971 def doshorten(stage):
972 
973  # Untar log files.
974 
975  untarlog(stage)
976 
977  # Loop over .root files in outdir.
978 
979  for out_subpath, subdirs, files in larbatch_posix.walk(stage.outdir):
980 
981  # Only examine files in leaf directories.
982 
983  if len(subdirs) != 0:
984  continue
985 
986  subdir = os.path.relpath(out_subpath, stage.outdir)
987  log_subpath = os.path.join(stage.bookdir, subdir)
988 
989  for file in files:
990  if file[-5:] == '.root':
991  if len(file) >= 200:
992 
993  # Long filenames renamed here.
994 
995  file_path = os.path.join(out_subpath, file)
996  shortfile = file[:150] + str(uuid.uuid4()) + '.root'
997  shortfile_path = os.path.join(out_subpath, shortfile)
998  print('%s\n->%s\n' % (file_path, shortfile_path))
999  larbatch_posix.rename(file_path, shortfile_path)
1000 
1001  # Also rename corresponding json file, if it exists.
1002 
1003  json_path = os.path.join(log_subpath, file + '.json')
1004  if larbatch_posix.exists(json_path):
1005  shortjson = shortfile + '.json'
1006  shortjson_path = os.path.join(log_subpath, shortjson)
1007  print('%s\n->%s\n' % (json_path, shortjson_path))
1008  larbatch_posix.rename(json_path, shortjson_path)
1009 
1010  return
1011 
1012 # Untar tarred up log files in logtir into bookdir.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def untarlog
Definition: project.py:1013
def doshorten
Definition: project.py:970
def project.dostatus (   projects)

Definition at line 625 of file project.py.

626 def dostatus(projects):
627 
628  # BatchStatus constructor requires authentication.
629 
630  project_utilities.test_kca()
631 
632  # For backward compatibility, allow this function to be called with
633  # either a single project or a list of projects.
634 
635  prjs = projects
636  if type(projects) != type([]) and type(projects) != type(()):
637  prjs = [projects]
638 
639  project_status = ProjectStatus(prjs)
640  batch_status = BatchStatus(prjs)
641 
642  for project in prjs:
643 
644  print('\nProject %s:' % project.name)
645 
646  # Loop over stages.
647 
648  for stage in project.stages:
649 
650  stagename = stage.name
651  stage_status = project_status.get_stage_status(stagename)
652  b_stage_status = batch_status.get_stage_status(stagename)
653  if stage_status.exists:
654  print('\nStage %s: %d art files, %d events, %d analysis files, %d errors, %d missing files.' % (
655  stagename, stage_status.nfile, stage_status.nev, stage_status.nana,
656  stage_status.nerror, stage_status.nmiss))
657  else:
658  print('\nStage %s output directory does not exist.' % stagename)
659  print('Stage %s batch jobs: %d idle, %d running, %d held, %d other.' % (
660  stagename, b_stage_status[0], b_stage_status[1], b_stage_status[2], b_stage_status[3]))
661  return
662 
663 
664 # Recursively extract projects from an xml element.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def dostatus
Definition: project.py:625
def project.dosubmit (   project,
  stage,
  makeup = False,
  recur = False,
  dryrun = False 
)

Definition at line 3634 of file project.py.

3635 def dosubmit(project, stage, makeup=False, recur=False, dryrun=False):
3636 
3637  # Make sure we have a kerberos ticket.
3638 
3639  project_utilities.test_kca()
3640 
3641  # Make sure jobsub_client is available.
3642 
3643  larbatch_utilities.test_jobsub()
3644 
3645  # Run presubmission check script.
3646 
3647  ok = stage.checksubmit()
3648  if ok != 0:
3649  print('No jobs submitted.')
3650  return
3651 
3652  # In pubs mode, delete any existing work, log, or output
3653  # directories, since there is no separate makeup action for pubs
3654  # mode.
3655 
3656  if stage.pubs_output and not stage.dynamic:
3657  if larbatch_posix.exists(stage.workdir):
3658  larbatch_posix.rmtree(stage.workdir)
3659  if larbatch_posix.exists(stage.outdir):
3660  larbatch_posix.rmtree(stage.outdir)
3661  if larbatch_posix.exists(stage.logdir):
3662  larbatch_posix.rmtree(stage.logdir)
3663  if larbatch_posix.exists(stage.bookdir):
3664  larbatch_posix.rmtree(stage.bookdir)
3665 
3666  # Make or check directories.
3667 
3668  if not makeup:
3669  stage.makedirs()
3670  else:
3671  stage.checkdirs()
3672 
3673  # Check input files.
3674 
3675  ok = stage.checkinput(checkdef=True)
3676  if ok != 0:
3677  print('No jobs submitted.')
3678  return
3679 
3680  # Make sure output and log directories are empty (submit only).
3681 
3682  if not makeup and not recur and not stage.dynamic:
3683  if len(larbatch_posix.listdir(stage.outdir)) != 0:
3684  raise RuntimeError('Output directory %s is not empty.' % stage.outdir)
3685  if len(larbatch_posix.listdir(stage.logdir)) != 0:
3686  raise RuntimeError('Log directory %s is not empty.' % stage.logdir)
3687  if len(larbatch_posix.listdir(stage.bookdir)) != 0:
3688  raise RuntimeError('Log directory %s is not empty.' % stage.bookdir)
3689 
3690  # Copy files to workdir and issue jobsub command to submit jobs.
3691 
3692  jobid = dojobsub(project, stage, makeup, recur, dryrun)
3693 
3694  # Append jobid to file "jobids.list" in the log directory.
3695 
3696  jobids_filename = os.path.join(stage.bookdir, 'jobids.list')
3697  jobids = []
3698  if larbatch_posix.exists(jobids_filename):
3699  lines = larbatch_posix.readlines(jobids_filename)
3700  for line in lines:
3701  id = line.strip()
3702  if len(id) > 0:
3703  jobids.append(id)
3704  if len(jobid) > 0:
3705  jobids.append(jobid)
3706 
3707  jobid_file = safeopen(jobids_filename)
3708  for jobid in jobids:
3709  jobid_file.write('%s\n' % jobid)
3710  jobid_file.close()
3711 
3712  # Done.
3713 
3714  return jobid
3715 
3716 # Merge histogram files.
3717 # If mergehist is True, merge histograms using "hadd -T".
3718 # If mergentuple is True, do full merge using "hadd".
3719 # If neither argument is True, do custom merge using merge program specified
3720 # in xml stage.
def dosubmit
Definition: project.py:3634
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def safeopen
Definition: project.py:4739
def dojobsub
Definition: project.py:2482
def project.dotest_declarations (   dim)

Definition at line 2158 of file project.py.

2159 def dotest_declarations(dim):
2160 
2161  # Initialize samweb.
2162 
2163  import_samweb()
2164 
2165  # Do query
2166 
2167  result = samweb.listFilesSummary(dimensions=dim)
2168  for key in list(result.keys()):
2169  print('%s: %s' % (key, result[key]))
2170 
2171  return 0
2172 
2173 # Check sam dataset definition.
2174 # Return 0 if dataset is defined or definition name is null.
2175 # Return nonzero if dataset is not defined.
def dotest_declarations
Definition: project.py:2158
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def import_samweb
Definition: project.py:504
list
Definition: file_to_url.sh:28
def project.dotest_definition (   defname)

Definition at line 2217 of file project.py.

2218 def dotest_definition(defname):
2219 
2220  # Initialize samweb.
2221 
2222  import_samweb()
2223 
2224  # Do query
2225 
2226  result = samweb.listFilesSummary(defname=defname)
2227  for key in list(result.keys()):
2228  print('%s: %s' % (key, result[key]))
2229 
2230  return 0
2231 
2232 # Delete sam dataset definition.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def import_samweb
Definition: project.py:504
def dotest_definition
Definition: project.py:2217
list
Definition: file_to_url.sh:28
def project.doundefine (   defname)

Definition at line 2233 of file project.py.

2234 def doundefine(defname):
2235 
2236  if defname == '':
2237  return 1
2238 
2239  # Initialize samweb.
2240 
2241  import_samweb()
2242 
2243  # See if this definition already exists.
2244 
2245  def_exists = False
2246  try:
2247  desc = samweb.descDefinition(defname=defname)
2248  def_exists = True
2249  except samweb_cli.exceptions.DefinitionNotFound:
2250  pass
2251 
2252  # Make report and maybe make definition.
2253 
2254  if def_exists:
2255  print('Deleting definition: %s' % defname)
2256  project_utilities.test_kca()
2257  samweb.deleteDefinition(defname=defname)
2258  else:
2259  print('No such definition: %s' % defname)
2260 
2261  return 0
2262 
2263 # Check disk locations. Maybe add or remove locations.
2264 # This method only generates output and returns zero.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def import_samweb
Definition: project.py:504
def doundefine
Definition: project.py:2233
def project.find_projects (   element,
  check = True 
)

Definition at line 665 of file project.py.

666 def find_projects(element, check=True):
667 
668  projects = []
669 
670  # First check if the input element is a project. In that case, return a
671  # list containing the project name as the single element of the list.
672 
673  if element.nodeName == 'project':
674  default_input_by_stage = {}
675  project = ProjectDef(element, '', default_input_by_stage, check=check)
676  projects.append(project)
677 
678  else:
679 
680  # Input element is not a project.
681  # Loop over subelements.
682 
683  default_input = ''
684  default_input_by_stage = {}
685  subelements = element.getElementsByTagName('project')
686  for subelement in subelements:
687  project = ProjectDef(subelement, default_input, default_input_by_stage, check=check)
688  projects.append(project)
689  for stage in project.stages:
690  stage_list = os.path.join(stage.bookdir, 'files.list')
691  default_input_by_stage[stage.name] = stage_list
692  default_input = stage_list
693 
694  # Done.
695 
696  return projects
697 
698 
699 # Extract all projects from the specified xml file.
def find_projects
Definition: project.py:665
def project.get_input_files (   stage)

Definition at line 941 of file project.py.

942 def get_input_files(stage):
943 
944  # In case of single file or file list input, files are returned exactly
945  # as specified, which would normallly be as the full path.
946  # In case of sam input, only the file names are returned (guaranteed unique).
947 
948  result = []
949  if stage.inputfile != '':
950  result.append(stage.inputfile)
951 
952  elif stage.inputlist != '' and larbatch_posix.exists(stage.inputlist):
953  try:
954  input_filenames = larbatch_posix.readlines(stage.inputlist)
955  for line in input_filenames:
956  words = line.split()
957  result.append(words[0])
958  except:
959  pass
960 
961  elif stage.inputdef != '':
962  import_samweb()
963  result = samweb.listFiles(defname=stage.inputdef)
964 
965  # Done.
966 
967  return result
968 
969 # Shorten root file names to have fewer than 200 characters.
def import_samweb
Definition: project.py:504
def get_input_files
Definition: project.py:941
def project.get_project (   xmlfile,
  projectname = '',
  stagename = '',
  check = True 
)

Definition at line 755 of file project.py.

756 def get_project(xmlfile, projectname='', stagename='', check=True):
757  projects = get_projects(xmlfile, check=check)
758  project = select_project(projects, projectname, stagename)
759  return project
760 
761 # Extract the next sequential stage
def select_project
Definition: project.py:740
def get_project
Definition: project.py:755
def get_projects
Definition: project.py:700
def project.get_projects (   xmlfile,
  check = True 
)

Definition at line 700 of file project.py.

701 def get_projects(xmlfile, check=True):
702 
703  # Cache results.
704 
705  if xmlfile in get_projects.cache:
706  return get_projects.cache[xmlfile]
707 
708  # Parse xml (returns xml document).
709 
710  if xmlfile == '-':
711  xml = sys.stdin
712  elif xmlfile.find(':') < 0:
713  xml = open(xmlfile)
714  else:
715  xml = urlrequest.urlopen(xmlfile)
716  doc = parse(xml)
717 
718  # Extract root element.
719 
720  root = doc.documentElement
721 
722  # Find project names in the root element.
723 
724  projects = find_projects(root, check=check)
725 
726  # Cache result.
727 
728  get_projects.cache[xmlfile] = projects
729 
730  # Done.
731 
732  return projects
733 
734 # Get_projects result cache.
735 
736 get_projects.cache = {}
737 
738 
739 # Select the specified project.
def find_projects
Definition: project.py:665
def get_projects
Definition: project.py:700
open(RACETRACK) or die("Could not open file $RACETRACK for writing")
def project.get_pubs_stage (   xmlfile,
  projectname,
  stagename,
  run,
  subruns,
  version = None 
)

Definition at line 814 of file project.py.

815 def get_pubs_stage(xmlfile, projectname, stagename, run, subruns, version=None):
816  projects = get_projects(xmlfile)
817  project = select_project(projects, projectname, stagename)
818  if project == None:
819  raise RuntimeError('No project selected for projectname=%s, stagename=%s' % (
820  projectname, stagename))
821  stage = project.get_stage(stagename)
822  if stage == None:
823  raise RuntimeError('No stage selected for projectname=%s, stagename=%s' % (
824  projectname, stagename))
825  get_projects.cache = {}
826  stage.pubsify_input(run, subruns, version)
827  stage.pubsify_output(run, subruns, version)
828  get_projects.cache = {}
829  return project, stage
830 
831 
832 # Check a single root file.
833 # Returns a 2-tuple containing the number of events and stream name.
834 # The number of events conveys the following information:
835 # 1. Number of events (>=0) in TTree named "Events."
836 # 2. -1 if root file does not contain an Events TTree, but is otherwise valid (openable).
837 # 3. -2 for error (root file does not exist or is not openable).
def select_project
Definition: project.py:740
def get_pubs_stage
Definition: project.py:814
def get_projects
Definition: project.py:700
def project.help ( )

Definition at line 3876 of file project.py.

3877 def help():
3878 
3879  filename = sys.argv[0]
3880  file = open(filename, 'r')
3881 
3882  doprint=0
3883 
3884  for line in file.readlines():
3885  if line[2:12] == 'project.py':
3886  doprint = 1
3887  elif line[0:6] == '######' and doprint:
3888  doprint = 0
3889  if doprint:
3890  if len(line) > 2:
3891  print(line[2:], end=' ')
3892  else:
3893  print()
3894 
3895 # Normalize xml path.
3896 #
3897 # Don't modify xml file path for any of the following cases.
3898 #
3899 # 1. xmlfile contains character ':'. In this case xmlfile may be a url.
3900 # 2. xmlfile starts with '/', './' or '../'.
3901 # 3. xmlfile is '-'. Stands for standard input.
3902 #
3903 # Otherwise, assume that xmlfile is a relative path. In this case, convert it to
3904 # an absolute path relative to the current working directory, or directory contained
3905 # in environment variable XMLPATH (colon-separated list of directories).
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def help
Definition: project.py:3876
open(RACETRACK) or die("Could not open file $RACETRACK for writing")
def project.import_samweb ( )

Definition at line 504 of file project.py.

505 def import_samweb():
506 
507  # Get intialized samweb, if not already done.
508 
509  global samweb
510  global extractor_dict
511  global expMetaData
512 
513 
514  if samweb == None:
515  samweb = project_utilities.samweb()
516  from extractor_dict import expMetaData
517 
518 # Multi-project clean function.
def import_samweb
Definition: project.py:504
def project.main (   argv)

Definition at line 3965 of file project.py.

3966 def main(argv):
3967 
3968  # Parse arguments.
3969 
3970  xmlfile = ''
3971  projectname = ''
3972  stagenames = ['']
3973  lines = ''
3974  site = ''
3975  cpu = 0
3976  disk = ''
3977  memory = 0
3978  inputdef = ''
3979  merge = 0
3980  submit = 0
3981  recur = 0
3982  pubs = 0
3983  pubs_run = 0
3984  pubs_subruns = []
3985  pubs_version = None
3986  check = 0
3987  checkana = 0
3988  shorten = 0
3989  fetchlog = 0
3990  mergehist = 0
3991  mergentuple = 0
3992  audit = 0
3993  stage_status = 0
3994  makeup = 0
3995  clean = 0
3996  clean_one = 0
3997  dump_project = 0
3998  dump_stage = 0
3999  dryrun = 0
4000  nocheck = 0
4001  print_outdir = 0
4002  print_logdir = 0
4003  print_workdir = 0
4004  print_bookdir = 0
4005  fcl = 0
4006  defname = 0
4007  do_input_files = 0
4008  do_check_submit = 0
4009  do_check_input = 0
4010  declare = 0
4011  declare_ana = 0
4012  define = 0
4013  define_ana = 0
4014  undefine = 0
4015  check_declarations = 0
4016  check_declarations_ana = 0
4017  test_declarations = 0
4018  test_declarations_ana = 0
4019  check_definition = 0
4020  check_definition_ana = 0
4021  test_definition = 0
4022  test_definition_ana = 0
4023  add_locations = 0
4024  add_locations_ana = 0
4025  check_locations = 0
4026  check_locations_ana = 0
4027  upload = 0
4028  upload_ana = 0
4029  check_tape = 0
4030  check_tape_ana = 0
4031  clean_locations = 0
4032  clean_locations_ana = 0
4033  remove_locations = 0
4034  remove_locations_ana = 0
4035 
4036  args = argv[1:]
4037  while len(args) > 0:
4038  if args[0] == '-h' or args[0] == '--help' :
4039  help()
4040  return 0
4041  elif args[0] == '-xh' or args[0] == '--xmlhelp' :
4042  xmlhelp()
4043  return 0
4044  elif args[0] == '--xml' and len(args) > 1:
4045  xmlfile = args[1]
4046  del args[0:2]
4047  elif args[0] == '--project' and len(args) > 1:
4048  projectname = args[1]
4049  del args[0:2]
4050  elif args[0] == '--stage' and len(args) > 1:
4051  stagenames = args[1].split(',')
4052  del args[0:2]
4053  elif args[0] == '--tmpdir' and len(args) > 1:
4054  os.environ['TMPDIR'] = args[1]
4055  del args[0:2]
4056  elif args[0] == '--lines' and len(args) > 1:
4057  lines = args[1]
4058  del args[0:2]
4059  elif args[0] == '--site' and len(args) > 1:
4060  site = args[1]
4061  del args[0:2]
4062  elif args[0] == '--cpu' and len(args) > 1:
4063  cpu = int(args[1])
4064  del args[0:2]
4065  elif args[0] == '--disk' and len(args) > 1:
4066  disk = args[1]
4067  del args[0:2]
4068  elif args[0] == '--memory' and len(args) > 1:
4069  memory = int(args[1])
4070  del args[0:2]
4071  elif args[0] == '--inputdef' and len(args) > 1:
4072  inputdef = args[1]
4073  del args[0:2]
4074  elif args[0] == '--submit':
4075  submit = 1
4076  del args[0]
4077  elif args[0] == '--recur':
4078  recur = 1
4079  del args[0]
4080  elif args[0] == '--pubs' and len(args) > 2:
4081  pubs = 1
4082  pubs_run = int(args[1])
4083  pubs_subruns = project_utilities.parseInt(args[2])
4084  del args[0:3]
4085  if len(args) > 0 and args[0] != '' and args[0][0] != '-':
4086  pubs_version = int(args[0])
4087  del args[0]
4088  elif args[0] == '--check':
4089  check = 1
4090  del args[0]
4091  elif args[0] == '--checkana':
4092  checkana = 1
4093  del args[0]
4094  elif args[0] == '--shorten':
4095  shorten = 1
4096  del args[0]
4097  elif args[0] == '--fetchlog':
4098  fetchlog = 1
4099  del args[0]
4100  elif args[0] == '--merge':
4101  merge = 1
4102  del args[0]
4103  elif args[0] == '--mergehist':
4104  mergehist = 1
4105  del args[0]
4106  elif args[0] == '--mergentuple':
4107  mergentuple = 1
4108  del args[0]
4109  elif args[0] == '--audit':
4110  audit = 1
4111  del args[0]
4112  elif args[0] == '--status':
4113  stage_status = 1
4114  del args[0]
4115  elif args[0] == '--makeup':
4116  makeup = 1
4117  del args[0]
4118  elif args[0] == '--clean':
4119  clean = 1
4120  del args[0]
4121  elif args[0] == '--clean_one':
4122  clean_one = 1
4123  del args[0]
4124  elif args[0] == '--dump_project':
4125  dump_project = 1
4126  del args[0]
4127  elif args[0] == '--dump_stage':
4128  dump_stage = 1
4129  del args[0]
4130  elif args[0] == '--dryrun':
4131  dryrun = 1
4132  del args[0]
4133  elif args[0] == '--nocheck':
4134  nocheck = 1
4135  del args[0]
4136  elif args[0] == '--outdir':
4137  print_outdir = 1
4138  del args[0]
4139  elif args[0] == '--logdir':
4140  print_logdir = 1
4141  del args[0]
4142  elif args[0] == '--workdir':
4143  print_workdir = 1
4144  del args[0]
4145  elif args[0] == '--bookdir':
4146  print_bookdir = 1
4147  del args[0]
4148  elif args[0] == '--fcl':
4149  fcl = 1
4150  del args[0]
4151  elif args[0] == '--defname':
4152  defname = 1
4153  del args[0]
4154  elif args[0] == '--input_files':
4155  do_input_files = 1
4156  del args[0]
4157  elif args[0] == '--check_submit':
4158  do_check_submit = 1
4159  del args[0]
4160  elif args[0] == '--check_input':
4161  do_check_input = 1
4162  del args[0]
4163  elif args[0] == '--declare':
4164  declare = 1
4165  del args[0]
4166  elif args[0] == '--declare_ana':
4167  declare_ana = 1
4168  del args[0]
4169  elif args[0] == '--define':
4170  define = 1
4171  del args[0]
4172  elif args[0] == '--define_ana':
4173  define_ana = 1
4174  del args[0]
4175  elif args[0] == '--undefine':
4176  undefine = 1
4177  del args[0]
4178  elif args[0] == '--check_declarations':
4179  check_declarations = 1
4180  del args[0]
4181  elif args[0] == '--check_declarations_ana':
4182  check_declarations_ana = 1
4183  del args[0]
4184  elif args[0] == '--test_declarations':
4185  test_declarations = 1
4186  del args[0]
4187  elif args[0] == '--test_declarations_ana':
4188  test_declarations_ana = 1
4189  del args[0]
4190  elif args[0] == '--check_definition':
4191  check_definition = 1
4192  del args[0]
4193  elif args[0] == '--check_definition_ana':
4194  check_definition_ana = 1
4195  del args[0]
4196  elif args[0] == '--test_definition':
4197  test_definition = 1
4198  del args[0]
4199  elif args[0] == '--test_definition_ana':
4200  test_definition_ana = 1
4201  del args[0]
4202  elif args[0] == '--add_locations':
4203  add_locations = 1
4204  del args[0]
4205  elif args[0] == '--add_locations_ana':
4206  add_locations_ana = 1
4207  del args[0]
4208  elif args[0] == '--check_locations':
4209  check_locations = 1
4210  del args[0]
4211  elif args[0] == '--check_locations_ana':
4212  check_locations_ana = 1
4213  del args[0]
4214  elif args[0] == '--upload':
4215  upload = 1
4216  del args[0]
4217  elif args[0] == '--upload_ana':
4218  upload_ana = 1
4219  del args[0]
4220  elif args[0] == '--check_tape':
4221  check_tape = 1
4222  del args[0]
4223  elif args[0] == '--check_tape_ana':
4224  check_tape_ana = 1
4225  del args[0]
4226  elif args[0] == '--clean_locations':
4227  clean_locations = 1
4228  del args[0]
4229  elif args[0] == '--clean_locations_ana':
4230  clean_locations_ana = 1
4231  del args[0]
4232  elif args[0] == '--remove_locations':
4233  remove_locations = 1
4234  del args[0]
4235  elif args[0] == '--remove_locations_ana':
4236  remove_locations_ana = 1
4237  del args[0]
4238  else:
4239  print('Unknown option %s' % args[0])
4240  return 1
4241 
4242  # Normalize xml file path.
4243 
4244  xmlfile = normxmlpath(xmlfile)
4245 
4246  # Make sure xmlfile was specified.
4247 
4248  if xmlfile == '':
4249  print('No xml file specified. Type "project.py -h" for help.')
4250  return 1
4251 
4252  # Make sure that no more than one action was specified (except clean, shorten, and info
4253  # options).
4254 
4255  num_action = submit + check + checkana + fetchlog + merge + mergehist + mergentuple + audit + stage_status + makeup + define + define_ana + undefine + declare + declare_ana
4256  if num_action > 1:
4257  print('More than one action was specified.')
4258  return 1
4259 
4260  # Extract all project definitions.
4261 
4262  projects = get_projects(xmlfile, check=(not nocheck))
4263 
4264  # Get the selected project element.
4265 
4266  for stagename in stagenames:
4267  project = select_project(projects, projectname, stagename)
4268  if project != None:
4269  if projectname == '':
4270  projectname = project.name
4271  else:
4272  raise RuntimeError('No project selected.\n')
4273 
4274  # Do clean action now. Cleaning can be combined with submission.
4275 
4276  if clean:
4277  for stagename in stagenames:
4278  docleanx(projects, projectname, stagename, clean_descendants = True)
4279 
4280  # Do clean_one action now. Cleaning can be combined with submission.
4281 
4282  if clean_one:
4283  for stagename in stagenames:
4284  docleanx(projects, projectname, stagename, clean_descendants = False)
4285 
4286  # Do stage_status now.
4287 
4288  if stage_status:
4289  dostatus(projects)
4290  return 0
4291 
4292  # Get the current stage definition, and pubsify it if necessary.
4293  # Also process any command line stage configuration overrides.
4294 
4295  stages = {}
4296  for stagename in stagenames:
4297  stage = project.get_stage(stagename)
4298  stages[stagename] = stage
4299 
4300  # Command line configuration overrides handled here.
4301 
4302  if lines != '':
4303  stage.lines = lines
4304  if site != '':
4305  stage.site = site
4306  if cpu != 0:
4307  stage.cpu = cpu
4308  if disk != '':
4309  stage.disk = disk
4310  if memory != 0:
4311  stage.memory = memory
4312  if inputdef != '':
4313  stage.inputdef = inputdef
4314  stage.inputfile = ''
4315  stage.inputlist = ''
4316  if recur != 0:
4317  stage.recur = recur
4318 
4319  # Pubs mode overrides handled here.
4320 
4321  if pubs:
4322  stage.pubsify_input(pubs_run, pubs_subruns, pubs_version)
4323  stage.pubsify_output(pubs_run, pubs_subruns, pubs_version)
4324 
4325  # Make recursive dataset definition here, if necessary.
4326 
4327  if stage.recur and stage.inputdef != '' and stage.basedef != '':
4328 
4329  # First check if stage.inptudef already exists.
4330 
4331  import_samweb()
4332  def_exists = False
4333  try:
4334  desc = samweb.descDefinition(defname=stage.inputdef)
4335  def_exists = True
4336  except samweb_cli.exceptions.DefinitionNotFound:
4337  pass
4338 
4339  if not def_exists:
4340 
4341  # Recurcive definition doesn't exist, so create it.
4342 
4343  project_utilities.test_kca()
4344 
4345  # Start sam dimension with the base dataset.
4346 
4347  dim = ''
4348 
4349  # Add minus clause.
4350 
4351  project_wildcard = '%s_%%' % samweb.makeProjectName(stage.inputdef).rsplit('_',1)[0]
4352  if stage.recurtype == 'snapshot':
4353  dim = 'defname: %s minus snapshot_for_project_name %s' % \
4354  (stage.basedef, project_wildcard)
4355  elif stage.recurtype == 'consumed':
4356  dim = 'defname: %s minus (project_name %s and consumed_status consumed)' % \
4357  (stage.basedef, project_wildcard)
4358 
4359  elif stage.recurtype == 'child':
4360 
4361  # In case of multiple data strams, generate one clause for each
4362  # data stream.
4363 
4364  nstream = 1
4365  if stage.data_stream != None and len(stage.data_stream) > 0:
4366  nstream = len(stage.data_stream)
4367 
4368  dim = ''
4369  for istream in range(nstream):
4370  idim = project_utilities.dimensions_datastream(project, stage,
4371  ana=False, index=istream)
4372  if idim.find('anylocation') > 0:
4373  idim = idim.replace('anylocation', 'physical')
4374  else:
4375  idim += ' with availability physical'
4376 
4377  if len(dim) > 0:
4378  dim += ' or '
4379  dim += '(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
4380 
4381  if stage.activebase != '':
4382  activedef = '%s_active' % stage.activebase
4383  waitdef = '%s_wait' % stage.activebase
4384  dim += ' minus defname: %s' % activedef
4385  dim += ' minus defname: %s' % waitdef
4386  project_utilities.makeDummyDef(activedef)
4387  project_utilities.makeDummyDef(waitdef)
4388 
4389  elif stage.recurtype == 'anachild':
4390 
4391  # In case of multiple data strams, generate one clause for each
4392  # data stream.
4393 
4394  nstream = 1
4395  if stage.ana_data_stream != None and len(stage.ana_data_stream) > 0:
4396  nstream = len(stage.ana_data_stream)
4397 
4398  dim = ''
4399  for istream in range(nstream):
4400  idim = project_utilities.dimensions_datastream(project, stage,
4401  ana=True, index=istream)
4402  if idim.find('anylocation') > 0:
4403  idim = idim.replace('anylocation', 'physical')
4404  else:
4405  idim += ' with availability physical'
4406 
4407  if len(dim) > 0:
4408  dim += ' or '
4409  dim += '(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
4410 
4411  if stage.activebase != '':
4412  activedef = '%s_active' % stage.activebase
4413  waitdef = '%s_wait' % stage.activebase
4414  dim += ' minus defname: %s' % activedef
4415  dim += ' minus defname: %s' % waitdef
4416  project_utilities.makeDummyDef(activedef)
4417  project_utilities.makeDummyDef(waitdef)
4418 
4419  elif stage.recurtype != '' and stage.recurtype != 'none':
4420  raise RuntimeError('Unknown recursive type %s.' % stage.recurtype)
4421 
4422  # Add "with limit" clause.
4423 
4424  if stage.recurlimit != 0:
4425  dim += ' with limit %d' % stage.recurlimit
4426 
4427  # Create definition.
4428 
4429  print('Creating recursive dataset definition %s' % stage.inputdef)
4430  project_utilities.test_kca()
4431  samweb.createDefinition(defname=stage.inputdef, dims=dim)
4432 
4433 
4434  # Do dump stage action now.
4435 
4436  if dump_stage:
4437  for stagename in stagenames:
4438  print('Stage %s:' % stagename)
4439  stage = stages[stagename]
4440  print(stage)
4441 
4442  # Do dump project action now.
4443 
4444  if dump_project:
4445  print(project)
4446 
4447  # Do outdir action now.
4448 
4449  if print_outdir:
4450  for stagename in stagenames:
4451  print('Stage %s:' % stagename)
4452  stage = stages[stagename]
4453  print(stage.outdir)
4454 
4455  # Do logdir action now.
4456 
4457  if print_logdir:
4458  for stagename in stagenames:
4459  print('Stage %s:' % stagename)
4460  stage = stages[stagename]
4461  print(stage.logdir)
4462 
4463  # Do logdir action now.
4464 
4465  if print_workdir:
4466  for stagename in stagenames:
4467  print('Stage %s:' % stagename)
4468  stage = stages[stagename]
4469  print(stage.workdir)
4470 
4471  # Do bookdir action now.
4472 
4473  if print_bookdir:
4474  for stagename in stagenames:
4475  print('Stage %s:' % stagename)
4476  stage = stages[stagename]
4477  print(stage.bookdir)
4478 
4479  # Do defname action now.
4480 
4481  if defname:
4482  for stagename in stagenames:
4483  print('Stage %s:' % stagename)
4484  stage = stages[stagename]
4485  if stage.defname != '':
4486  print(stage.defname)
4487 
4488  # Do input_names action now.
4489 
4490  if do_input_files:
4491  for stagename in stagenames:
4492  print('Stage %s:' % stagename)
4493  stage = stages[stagename]
4494  input_files = get_input_files(stage)
4495  for input_file in input_files:
4496  print(input_file)
4497 
4498  # Do check_submit action now.
4499 
4500  if do_check_submit:
4501  for stagename in stagenames:
4502  print('Stage %s:' % stagename)
4503  stage = stages[stagename]
4504  stage.checksubmit()
4505 
4506  # Do check_input action now.
4507 
4508  if do_check_input:
4509  for stagename in stagenames:
4510  print('Stage %s:' % stagename)
4511  stage = stages[stagename]
4512  stage.checkinput(checkdef=True)
4513 
4514  # Do shorten action now.
4515 
4516  if shorten:
4517  for stagename in stagenames:
4518  print('Stage %s:' % stagename)
4519  stage = stages[stagename]
4520  doshorten(stage)
4521 
4522  # Do actions.
4523 
4524  rc = 0
4525 
4526  if submit or makeup:
4527 
4528  # Submit jobs.
4529 
4530  for stagename in stagenames:
4531  print('Stage %s:' % stagename)
4532 
4533  if project_utilities.check_running(xmlfile, stagename):
4534  print('Skipping job submission because similar job submission process is running.')
4535  else:
4536  stage = stages[stagename]
4537  dosubmit(project, stage, makeup, stage.recur, dryrun)
4538 
4539  if check or checkana:
4540 
4541  # Check results from specified project stage.
4542 
4543  for stagename in stagenames:
4544  print('Stage %s:' % stagename)
4545  stage = stages[stagename]
4546  docheck(project, stage, checkana or stage.ana, stage.validate_on_worker)
4547 
4548  if fetchlog:
4549 
4550  # Fetch logfiles.
4551 
4552  for stagename in stagenames:
4553  print('Stage %s:' % stagename)
4554  stage = stages[stagename]
4555  rc += dofetchlog(project, stage)
4556 
4557  if mergehist or mergentuple or merge:
4558 
4559  # Make merged histogram or ntuple files using proper hadd option.
4560  # Makes a merged root file called anahist.root in the project output directory
4561 
4562  for stagename in stagenames:
4563  print('Stage %s:' % stagename)
4564  stage = stages[stagename]
4565  domerge(stage, mergehist, mergentuple)
4566 
4567  if audit:
4568 
4569  # Sam audit.
4570 
4571  for stagename in stagenames:
4572  print('Stage %s:' % stagename)
4573  stage = stages[stagename]
4574  doaudit(stage)
4575 
4576  if check_definition or define:
4577 
4578  # Make sam dataset definition.
4579 
4580  for stagename in stagenames:
4581  print('Stage %s:' % stagename)
4582  stage = stages[stagename]
4583  if stage.ana:
4584  if stage.ana_defname == '':
4585  print('No sam analysis dataset definition name specified for this stage.')
4586  return 1
4587  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4588  docheck_definition(stage.ana_defname, dim, define)
4589  else:
4590  if stage.defname == '':
4591  print('No sam dataset definition name specified for this stage.')
4592  return 1
4593  dim = project_utilities.dimensions_datastream(project, stage, ana=False)
4594  docheck_definition(stage.defname, dim, define)
4595 
4596  if check_definition_ana or define_ana:
4597 
4598  # Make sam dataset definition for analysis files.
4599 
4600  for stagename in stagenames:
4601  print('Stage %s:' % stagename)
4602  stage = stages[stagename]
4603  if stage.ana_defname == '':
4604  print('No sam analysis dataset definition name specified for this stage.')
4605  return 1
4606  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4607  docheck_definition(stage.ana_defname, dim, define_ana)
4608 
4609  if test_definition:
4610 
4611  # Print summary of files returned by dataset definition.
4612 
4613  for stagename in stagenames:
4614  print('Stage %s:' % stagename)
4615  stage = stages[stagename]
4616  if stage.ana:
4617  if stage.ana_defname == '':
4618  print('No sam dataset definition name specified for this stage.')
4619  return 1
4620  rc += dotest_definition(stage.ana_defname)
4621  else:
4622  if stage.defname == '':
4623  print('No sam dataset definition name specified for this stage.')
4624  return 1
4625  rc += dotest_definition(stage.defname)
4626 
4627  if test_definition_ana:
4628 
4629  # Print summary of files returned by analysis dataset definition.
4630 
4631  for stagename in stagenames:
4632  print('Stage %s:' % stagename)
4633  stage = stages[stagename]
4634  if stage.ana_defname == '':
4635  print('No sam dataset definition name specified for this stage.')
4636  return 1
4637  rc += dotest_definition(stage.ana_defname)
4638 
4639  if undefine:
4640 
4641  # Delete sam dataset definition.
4642 
4643  for stagename in stagenames:
4644  print('Stage %s:' % stagename)
4645  stage = stages[stagename]
4646  if stage.defname == '':
4647  print('No sam dataset definition name specified for this stage.')
4648  return 1
4649  rc += doundefine(stage.defname)
4650 
4651  if check_declarations or declare:
4652 
4653  # Check sam declarations.
4654 
4655  for stagename in stagenames:
4656  print('Stage %s:' % stagename)
4657  stage = stages[stagename]
4658  docheck_declarations(stage.bookdir, stage.outdir, declare, ana=stage.ana)
4659 
4660  if check_declarations_ana or declare_ana:
4661 
4662  # Check sam analysis declarations.
4663 
4664  for stagename in stagenames:
4665  print('Stage %s:' % stagename)
4666  stage = stages[stagename]
4667  docheck_declarations(stage.bookdir, stage.outdir, declare_ana, ana=True)
4668 
4669  if test_declarations:
4670 
4671  # Print summary of declared files.
4672 
4673  for stagename in stagenames:
4674  print('Stage %s:' % stagename)
4675  stage = stages[stagename]
4676  dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4677  rc += dotest_declarations(dim)
4678 
4679  if test_declarations_ana:
4680 
4681  # Print summary of declared files.
4682 
4683  for stagename in stagenames:
4684  print('Stage %s:' % stagename)
4685  stage = stages[stagename]
4686  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4687  rc += dotest_declarations(dim)
4688 
4689  if check_locations or add_locations or clean_locations or remove_locations or upload:
4690 
4691  # Check sam disk locations.
4692 
4693  for stagename in stagenames:
4694  print('Stage %s:' % stagename)
4695  stage = stages[stagename]
4696  dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4697  docheck_locations(dim, stage.outdir,
4698  add_locations, clean_locations, remove_locations,
4699  upload)
4700 
4701  if check_locations_ana or add_locations_ana or clean_locations_ana or \
4702  remove_locations_ana or upload_ana:
4703 
4704  # Check sam disk locations.
4705 
4706  for stagename in stagenames:
4707  print('Stage %s:' % stagename)
4708  stage = stages[stagename]
4709  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4710  docheck_locations(dim, stage.outdir,
4711  add_locations_ana, clean_locations_ana, remove_locations_ana,
4712  upload_ana)
4713 
4714  if check_tape:
4715 
4716  # Check sam tape locations.
4717 
4718  for stagename in stagenames:
4719  print('Stage %s:' % stagename)
4720  stage = stages[stagename]
4721  dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4722  docheck_tape(dim)
4723 
4724  if check_tape_ana:
4725 
4726  # Check analysis file sam tape locations.
4727 
4728  for stagename in stagenames:
4729  print('Stage %s:' % stagename)
4730  stage = stages[stagename]
4731  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4732  docheck_tape(dim)
4733 
4734  # Done.
4735 
4736  return rc
4737 
4738 # Open and truncate a file for writing using larbatch_posix.open.
def dotest_declarations
Definition: project.py:2158
def select_project
Definition: project.py:740
def dosubmit
Definition: project.py:3634
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def docheck
Definition: project.py:1087
def help
Definition: project.py:3876
def import_samweb
Definition: project.py:504
def dotest_definition
Definition: project.py:2217
def doundefine
Definition: project.py:2233
def dostatus
Definition: project.py:625
def docheck_definition
Definition: project.py:2176
def main
Definition: project.py:3965
def xmlhelp
Definition: project.py:3944
def docheck_declarations
Definition: project.py:2071
def doshorten
Definition: project.py:970
def get_projects
Definition: project.py:700
def doaudit
Definition: project.py:3778
def docleanx
Definition: project.py:519
def get_input_files
Definition: project.py:941
def docheck_tape
Definition: project.py:2432
def normxmlpath
Definition: project.py:3906
def domerge
Definition: project.py:3721
def docheck_locations
Definition: project.py:2265
def dofetchlog
Definition: project.py:1946
def project.next_stage (   projects,
  stagename,
  circular = False 
)

Definition at line 762 of file project.py.

763 def next_stage(projects, stagename, circular=False):
764 
765  # Loop over projects.
766 
767  found = False
768  for project in projects:
769 
770  # Loop over stages.
771 
772  for stage in project.stages:
773  if found:
774  return stage
775  if stage.name == stagename:
776  found = True
777 
778  # Circular mode: Choose first stage if we fell out of the loop.
779 
780  if circular and len(projects) > 0 and len(projects[0].stages) > 0:
781  return projects[0].stages[0]
782 
783  # Finally return None if we didn't find anything appropriate.
784 
785  return None
786 
787 # Extract the previous sequential stage.
def next_stage
Definition: project.py:762
def project.normxmlpath (   xmlfile)

Definition at line 3906 of file project.py.

3907 def normxmlpath(xmlfile):
3908 
3909  # Default result = input.
3910 
3911  normxmlfile = xmlfile
3912 
3913  # Does this look like a relative path?
3914 
3915  if xmlfile.find(':') < 0 and \
3916  not xmlfile.startswith('/') and \
3917  not xmlfile.startswith('./') and \
3918  not xmlfile.startswith('../') and \
3919  xmlfile != '-':
3920 
3921  # Yes, try to normalize path.
3922  # Construct a list of directories to search, starting with current working directory.
3923 
3924  dirs = [os.getcwd()]
3925 
3926  # Add directories in environment variable XMLPATH, if defined.
3927 
3928  if 'XMLPATH' in os.environ:
3929  dirs.extend(os.environ['XMLPATH'].split(':'))
3930 
3931  # Loop over directories.
3932 
3933  for dir in dirs:
3934  xmlpath = os.path.join(dir, xmlfile)
3935  if os.path.exists(xmlpath):
3936  normxmlfile = xmlpath
3937  break
3938 
3939  # Done.
3940 
3941  return normxmlfile
3942 
3943 # Print xml help.
def normxmlpath
Definition: project.py:3906
def project.previous_stage (   projects,
  stagename,
  circular = False 
)

Definition at line 788 of file project.py.

789 def previous_stage(projects, stagename, circular=False):
790 
791  # Initialize result None or last stage (if circular).
792 
793  result = None
794  if circular and len(projects) > 0 and len(projects[-1].stages) > 0:
795  result = projects[-1].stages[-1]
796 
797  # Loop over projects.
798 
799  for project in projects:
800 
801  # Loop over stages.
802 
803  for stage in project.stages:
804  if stage.name == stagename:
805  return result
806  result = stage
807 
808  # Return default answer if we fell out of the loop.
809 
810  return result
811 
812 # Extract pubsified stage from xml file.
813 # Return value is a 2-tuple (project, stage).
def previous_stage
Definition: project.py:788
def project.safeopen (   destination)

Definition at line 4739 of file project.py.

4740 def safeopen(destination):
4741  if larbatch_posix.exists(destination):
4742  larbatch_posix.remove(destination)
4743  file = larbatch_posix.open(destination, 'w')
4744  return file
4745 
4746 # Invoke main program.
4747 
#Utility funciton to scan a file and return its contents as a list
def safeopen
Definition: project.py:4739
def project.scan_file (   fileName)

Definition at line 4748 of file project.py.

4749 def scan_file(fileName):
4750  #openable = 1
4751  returnArray = []
4752  try:
4753  #print 'Reading %s' % fileName
4754  fileList = project_utilities.saferead(fileName)
4755  #if we can't find missing_files the check will not work
4756  except:
4757  #print 'Cannot open file: %s' % fileName
4758  return [ -1 ]
4759 
4760  if len(fileList) > 0:
4761  for line in fileList:
4762  returnArray.append(line.strip())
4763 
4764  else:
4765  #print '%s exists, but is empty' % fileName
4766 
4767  return [ -1 ]
4768 
4769  return returnArray
def scan_file
Definition: project.py:4748
def project.select_project (   projects,
  projectname,
  stagename 
)

Definition at line 740 of file project.py.

741 def select_project(projects, projectname, stagename):
742 
743  for project in projects:
744  if projectname == '' or projectname == project.name:
745  for stage in project.stages:
746  if stagename == '' or stagename == stage.name:
747  return project
748 
749  # Failure if we fall out of the loop.
750 
751  return None
752 
753 
754 # Extract the specified project element from xml file.
def select_project
Definition: project.py:740
def project.untarlog (   stage)

Definition at line 1013 of file project.py.

1014 def untarlog(stage):
1015 
1016  # Walk over logdir to look for log files.
1017 
1018  for log_subpath, subdirs, files in larbatch_posix.walk(stage.logdir):
1019 
1020  # Only examine leaf directories.
1021 
1022  if len(subdirs) != 0:
1023  continue
1024  subdir = os.path.relpath(log_subpath, stage.logdir)
1025  if subdir == '.':
1026  continue
1027  book_subpath = os.path.join(stage.bookdir, subdir)
1028  for file in files:
1029  if file.startswith('log') and file.endswith('.tar'):
1030  src = '%s/%s' % (log_subpath, file)
1031  dst = '%s/%s' % (book_subpath, file)
1032  flag = '%s.done' % dst
1033 
1034  # Decide if we need to copy this tarball to bookdir.
1035 
1036  if dst != src and not larbatch_posix.exists(flag):
1037 
1038  # Copy tarball to bookdir.
1039 
1040  print('Copying tarball %s into %s' % (src, book_subpath))
1041  if not larbatch_posix.isdir(book_subpath):
1042  larbatch_posix.makedirs(book_subpath)
1043  larbatch_posix.copy(src, dst)
1044 
1045  # Decide if we need to extract this tarball into bookdir.
1046 
1047  if not larbatch_posix.exists(flag):
1048 
1049  # Extract tarball.
1050 
1051  print('Extracting tarball %s' % dst)
1052  jobinfo = subprocess.Popen(['tar','-xf', dst, '-C', book_subpath,
1053  '--exclude=beam*.dat',
1054  '--exclude=beam*.info',
1055  '--exclude=core*',
1056  '--exclude=*.db',
1057  '--exclude=*.sh',
1058  '--exclude=*.py*',
1059  '--exclude=*.tar'],
1060  stdout=subprocess.PIPE,
1061  stderr=subprocess.PIPE)
1062  jobout, joberr = jobinfo.communicate()
1063  jobout = convert_str(jobout)
1064  joberr = convert_str(joberr)
1065  rc = jobinfo.poll()
1066  if rc != 0:
1067  print(jobout)
1068  print(joberr)
1069  print('Failed to extract log tarball in %s' % dst)
1070 
1071  else:
1072 
1073  # Create flag file.
1074 
1075  f = larbatch_posix.open(flag, 'w')
1076  f.write('\n') # Don't want zero size file.
1077  f.close()
1078 
1079  # Delete copy of tarball.
1080 
1081  if dst != src:
1082  larbatch_posix.remove(dst)
1083 
1084  return
1085 
1086 # Check project results in the specified directory.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def untarlog
Definition: project.py:1013
def project.xmlhelp ( )

Definition at line 3944 of file project.py.

3945 def xmlhelp():
3946 
3947  filename = sys.argv[0]
3948  file = open(filename, 'r')
3949 
3950  doprint=0
3951 
3952  for line in file.readlines():
3953  if line[2:20] == 'XML file structure':
3954  doprint = 1
3955  elif line[0:6] == '######' and doprint:
3956  doprint = 0
3957  if doprint:
3958  if len(line) > 2:
3959  print(line[2:], end=' ')
3960  else:
3961  print()
3962 
3963 
3964 # Main program.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def xmlhelp
Definition: project.py:3944
open(RACETRACK) or die("Could not open file $RACETRACK for writing")

Variable Documentation

project.extractor_dict = None

Definition at line 497 of file project.py.

project.proxy_ok = False

Definition at line 498 of file project.py.

project.samweb = None

Definition at line 496 of file project.py.