471 from __future__ 
import absolute_import
 
  472 from __future__ 
import print_function
 
  473 import sys, os, stat, subprocess, shutil, json, getpass, uuid, tempfile, hashlib
 
  475     import urllib.request 
as urlrequest
 
  477     import urllib 
as urlrequest
 
  478 import larbatch_posix
 
  483     import Queue 
as queue
 
  484 from xml.dom.minidom 
import parse
 
  485 import project_utilities, root_metadata
 
  486 from project_modules.projectdef 
import ProjectDef
 
  487 from project_modules.projectstatus 
import ProjectStatus
 
  488 from project_modules.batchstatus 
import BatchStatus
 
  489 from project_modules.jobsuberror 
import JobsubError
 
  490 from project_modules.ifdherror 
import IFDHError
 
  491 import larbatch_utilities
 
  492 from larbatch_utilities 
import convert_str
 
  493 from larbatch_utilities 
import convert_bytes
 
  497 extractor_dict = 
None    
  509     global extractor_dict
 
  514         samweb = project_utilities.samweb()
 
  515         from extractor_dict 
import expMetaData
 
  519 def docleanx(projects, projectname, stagename, clean_descendants = True):
 
  520     print(projectname, stagename)
 
  534     cleaned_bookdirs = []
 
  538     done_cleaning = 
False 
  539     while not done_cleaning:
 
  541         cleaned_something = 
False 
  545         for project 
in projects:
 
  546             for stage 
in project.stages:
 
  548                 clean_this_stage = 
False 
  552                 if not stage.bookdir 
in cleaned_bookdirs:
 
  556                     if (projectname == 
'' or project.name == projectname) 
and \
 
  557                        (stagename == 
'' or stage.name == stagename):
 
  559                         clean_this_stage = 
True 
  564                     elif clean_descendants 
and stage.inputlist != 
'' and \
 
  565                          os.path.dirname(stage.inputlist) 
in cleaned_bookdirs:
 
  567                         clean_this_stage = 
True 
  572                         cleaned_something = 
True 
  573                         cleaned_bookdirs.append(stage.bookdir)
 
  575                         print(
'Clean project %s, stage %s' % (project.name, stage.name))
 
  579                         if larbatch_posix.exists(stage.outdir):
 
  580                             dir_uid = larbatch_posix.stat(stage.outdir).st_uid
 
  581                             if dir_uid == uid 
or dir_uid == euid:
 
  582                                 print(
'Clean directory %s.' % stage.outdir)
 
  583                                 larbatch_posix.rmtree(stage.outdir)
 
  585                                 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.outdir)
 
  589                         if larbatch_posix.exists(stage.logdir):
 
  590                             dir_uid = larbatch_posix.stat(stage.logdir).st_uid
 
  591                             if dir_uid == uid 
or dir_uid == euid:
 
  592                                 print(
'Clean directory %s.' % stage.logdir)
 
  593                                 larbatch_posix.rmtree(stage.logdir)
 
  595                                 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.logdir)
 
  599                         if larbatch_posix.exists(stage.workdir):
 
  600                             dir_uid = larbatch_posix.stat(stage.workdir).st_uid
 
  601                             if dir_uid == uid 
or dir_uid == euid:
 
  602                                 print(
'Clean directory %s.' % stage.workdir)
 
  603                                 larbatch_posix.rmtree(stage.workdir)
 
  605                                 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.workdir)
 
  609                         if larbatch_posix.exists(stage.bookdir):
 
  610                             dir_uid = larbatch_posix.stat(stage.bookdir).st_uid
 
  611                             if dir_uid == uid 
or dir_uid == euid:
 
  612                                 print(
'Clean directory %s.' % stage.bookdir)
 
  613                                 larbatch_posix.rmtree(stage.bookdir)
 
  615                                 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.bookdir)
 
  617         done_cleaning = 
not cleaned_something
 
  629     project_utilities.test_kca()
 
  638     project_status = ProjectStatus(prjs)
 
  639     batch_status = BatchStatus(prjs)
 
  643         print(
'\nProject %s:' % project.name)
 
  647         for stage 
in project.stages:
 
  649             stagename = stage.name
 
  650             stage_status = project_status.get_stage_status(stagename)
 
  651             b_stage_status = batch_status.get_stage_status(stagename)
 
  652             if stage_status.exists:
 
  653                 print(
'\nStage %s: %d art files, %d events, %d analysis files, %d errors, %d missing files.' % (
 
  654                     stagename, stage_status.nfile, stage_status.nev, stage_status.nana,
 
  655                     stage_status.nerror, stage_status.nmiss))
 
  657                 print(
'\nStage %s output directory does not exist.' % stagename)
 
  658             print(
'Stage %s batch jobs: %d idle, %d running, %d held, %d other.' % (
 
  659                 stagename, b_stage_status[0], b_stage_status[1], b_stage_status[2], b_stage_status[3]))
 
  672     if element.nodeName == 
'project':
 
  673         default_input_by_stage = {}
 
  674         project = ProjectDef(element, 
'', default_input_by_stage, check=check)
 
  675         projects.append(project)
 
  683         default_input_by_stage = {}
 
  684         subelements = element.getElementsByTagName(
'project')
 
  685         for subelement 
in subelements:
 
  686             project = ProjectDef(subelement, default_input, default_input_by_stage, check=check)
 
  687             projects.append(project)
 
  688             for stage 
in project.stages:
 
  689                 stage_list = os.path.join(stage.bookdir, 
'files.list')
 
  690                 default_input_by_stage[stage.name] = stage_list
 
  691                 default_input = stage_list
 
  704     if xmlfile 
in get_projects.cache:
 
  705         return get_projects.cache[xmlfile]
 
  711     elif xmlfile.find(
':') < 0:
 
  714         xml = urlrequest.urlopen(xmlfile)
 
  719     root = doc.documentElement
 
  727     get_projects.cache[xmlfile] = projects
 
  735 get_projects.cache = {}
 
  742     for project 
in projects:
 
  743         if projectname == 
'' or projectname == project.name:
 
  744             for stage 
in project.stages:
 
  745                 if stagename == 
'' or stagename == stage.name:
 
  755 def get_project(xmlfile, projectname='', stagename='', check=True):
 
  767     for project 
in projects:
 
  771         for stage 
in project.stages:
 
  774             if stage.name == stagename:
 
  779     if circular 
and len(projects) > 0 
and len(projects[0].stages) > 0:
 
  780         return projects[0].stages[0]
 
  793     if circular 
and len(projects) > 0 
and len(projects[-1].stages) > 0:
 
  794         result = projects[-1].stages[-1]
 
  798     for project 
in projects:
 
  802         for stage 
in project.stages:
 
  803             if stage.name == stagename:
 
  818         raise RuntimeError(
'No project selected for projectname=%s, stagename=%s' % (
 
  819             projectname, stagename))
 
  820     stage = project.get_stage(stagename)
 
  822         raise RuntimeError(
'No stage selected for projectname=%s, stagename=%s' % (
 
  823             projectname, stagename))
 
  824     get_projects.cache = {}
 
  825     stage.pubsify_input(run, subruns, version)
 
  826     stage.pubsify_output(run, subruns, version)
 
  827     get_projects.cache = {}
 
  828     return project, stage
 
  847     if not larbatch_posix.exists(path):
 
  852     json_path = os.path.join(logdir, os.path.basename(path) + 
'.json')
 
  853     if larbatch_posix.exists(json_path):
 
  858             lines = larbatch_posix.readlines(json_path)
 
  873             if len(
list(md.keys())) > 0:
 
  877                     nevroot = int(md[
'events'])
 
  878                 if 'data_stream' in md:
 
  879                     stream = md[
'data_stream']
 
  880                 result = (nevroot, stream)
 
  908     print(
'Checking root files in directory %s.' % outdir)
 
  909     filenames = larbatch_posix.listdir(outdir)
 
  910     for filename 
in filenames:
 
  911         name, ext = os.path.splitext(filename)
 
  912         if len(ext) > 0 
and ext[1:] 
in data_file_types:
 
  913             path = os.path.join(outdir, filename)
 
  919                 roots.append((os.path.join(outdir, filename), nevroot, stream))
 
  925                 hists.append(os.path.join(outdir, filename))
 
  932                 print(
'Warning: File %s in directory %s is not a valid root file.' % (filename, outdir))
 
  936     return (nev, roots, hists)
 
  948     if stage.inputfile != 
'':
 
  949         result.append(stage.inputfile)
 
  951     elif stage.inputlist != 
'' and larbatch_posix.exists(stage.inputlist):
 
  953             input_filenames = larbatch_posix.readlines(stage.inputlist)
 
  954             for line 
in input_filenames:
 
  956                 result.append(words[0])
 
  960     elif stage.inputdef != 
'':
 
  962         result = samweb.listFiles(defname=stage.inputdef)
 
  978     for out_subpath, subdirs, files 
in larbatch_posix.walk(stage.outdir):
 
  982         if len(subdirs) != 0:
 
  985         subdir = os.path.relpath(out_subpath, stage.outdir)
 
  986         log_subpath = os.path.join(stage.bookdir, subdir)
 
  989             if file[-5:] == 
'.root':
 
  994                     file_path = os.path.join(out_subpath, file)
 
  995                     shortfile = file[:150] + str(uuid.uuid4()) + 
'.root' 
  996                     shortfile_path = os.path.join(out_subpath, shortfile)
 
  997                     print(
'%s\n->%s\n' % (file_path, shortfile_path))
 
  998                     larbatch_posix.rename(file_path, shortfile_path)
 
 1002                     json_path = os.path.join(log_subpath, file + 
'.json')
 
 1003                     if larbatch_posix.exists(json_path):
 
 1004                         shortjson = shortfile + 
'.json' 
 1005                         shortjson_path = os.path.join(log_subpath, shortjson)
 
 1006                         print(
'%s\n->%s\n' % (json_path, shortjson_path))
 
 1007                         larbatch_posix.rename(json_path, shortjson_path)
 
 1017     for log_subpath, subdirs, files 
in larbatch_posix.walk(stage.logdir):
 
 1021         if len(subdirs) != 0:
 
 1023         subdir = os.path.relpath(log_subpath, stage.logdir)
 
 1026         book_subpath = os.path.join(stage.bookdir, subdir)
 
 1028             if file.startswith(
'log') 
and file.endswith(
'.tar'):
 
 1029                 src = 
'%s/%s' % (log_subpath, file)
 
 1030                 dst = 
'%s/%s' % (book_subpath, file)
 
 1031                 flag = 
'%s.done' % dst
 
 1035                 if dst != src 
and not larbatch_posix.exists(flag):
 
 1039                     print(
'Copying tarball %s into %s' % (src, book_subpath))
 
 1040                     if not larbatch_posix.isdir(book_subpath):
 
 1041                         larbatch_posix.makedirs(book_subpath)
 
 1042                     larbatch_posix.copy(src, dst)
 
 1046                 if not larbatch_posix.exists(flag):
 
 1050                     print(
'Extracting tarball %s' % dst)
 
 1051                     jobinfo = subprocess.Popen([
'tar',
'-xf', dst, 
'-C', book_subpath,
 
 1052                                                 '--exclude=beam*.dat',
 
 1053                                                 '--exclude=beam*.info',
 
 1059                                                stdout=subprocess.PIPE,
 
 1060                                                stderr=subprocess.PIPE)
 
 1061                     jobout, joberr = jobinfo.communicate()
 
 1068                         print(
'Failed to extract log tarball in %s' % dst)
 
 1074                         f = larbatch_posix.open(flag, 
'w')
 
 1081                             larbatch_posix.remove(dst)
 
 1142     if quick == 1 
and not ana:
 
 1149     if not larbatch_posix.exists(stage.outdir):
 
 1150         print(
'Output directory %s does not exist.' % stage.outdir)
 
 1152     if not larbatch_posix.exists(stage.bookdir):
 
 1153         print(
'Log directory %s does not exist.' % stage.bookdir)
 
 1157     has_metadata = project.file_type != 
'' or project.run_type != 
'' 
 1158     has_input = stage.inputfile != 
'' or stage.inputlist != 
'' or stage.inputdef != 
'' 
 1159     print(
'Checking directory %s' % stage.bookdir)
 
 1177     for log_subpath, subdirs, files 
in larbatch_posix.walk(stage.bookdir):
 
 1181         if len(subdirs) != 0:
 
 1184         subdir = os.path.relpath(log_subpath, stage.bookdir)
 
 1187         out_subpath = os.path.join(stage.outdir, subdir)
 
 1188         dirok = project_utilities.fast_isdir(log_subpath)
 
 1192         if dirok 
and log_subpath[-6:] == 
'_start':
 
 1193             filename = os.path.join(log_subpath, 
'sam_project.txt')
 
 1194             if larbatch_posix.exists(filename):
 
 1195                 sam_project = larbatch_posix.readlines(filename)[0].
strip()
 
 1196                 if sam_project != 
'' and not sam_project 
in sam_projects:
 
 1197                     sam_projects.append(sam_project)
 
 1201         if dirok 
and not subdir[-6:] == 
'_start' and not subdir[-5:] == 
'_stop' \
 
 1202                 and not subdir == 
'log':
 
 1208             if not project_utilities.fast_isdir(out_subpath):
 
 1209                 print(
'No output directory corresponding to subdirectory %s.' % subdir)
 
 1215                 stat_filename = os.path.join(log_subpath, 
'lar.stat')
 
 1216                 if larbatch_posix.exists(stat_filename):
 
 1219                         status = int(larbatch_posix.readlines(stat_filename)[0].
strip())
 
 1221                             print(
'Job in subdirectory %s ended with non-zero exit status %d.' % (
 
 1225                         print(
'Bad file lar.stat in subdirectory %s.' % subdir)
 
 1233                 nev, roots, subhists = 
check_root(out_subpath, log_subpath, stage.datafiletypes)
 
 1235                     if len(roots) == 0 
or nev < 0:
 
 1236                         print(
'Problem with root file(s) in subdirectory %s.' % subdir)
 
 1238                 elif nev < -1 
or len(subhists) == 0:
 
 1239                     print(
'Problem with analysis root file(s) in subdirectory %s.' % subdir)
 
 1245             if not bad 
and has_metadata:
 
 1247                     rootname = os.path.basename(root[0])
 
 1248                     for s 
in list(procmap.keys()):
 
 1249                         oldroots = procmap[s]
 
 1250                         for oldroot 
in oldroots:
 
 1251                             oldrootname = os.path.basename(oldroot[0])
 
 1252                             if rootname == oldrootname:
 
 1253                                 print(
'Duplicate filename %s in subdirectory %s' % (rootname,
 
 1255                                 olddir = os.path.basename(os.path.dirname(oldroot[0]))
 
 1256                                 print(
'Previous subdirectory %s' % olddir)
 
 1261             if not bad 
and has_metadata:
 
 1263                     rootname = os.path.basename(root[0])
 
 1264                     if len(rootname) >= 200:
 
 1265                         print(
'Filename %s in subdirectory %s is longer than 200 characters.' % (
 
 1272             if not bad 
and stage.inputdef != 
'':
 
 1273                 filename1 = os.path.join(log_subpath, 
'sam_project.txt')
 
 1274                 if not larbatch_posix.exists(filename1):
 
 1275                     print(
'Could not find file sam_project.txt')
 
 1277                 filename2 = os.path.join(log_subpath, 
'cpid.txt')
 
 1278                 if not larbatch_posix.exists(filename2):
 
 1279                     print(
'Could not find file cpid.txt')
 
 1282                     sam_project = larbatch_posix.readlines(filename1)[0].
strip()
 
 1283                     if not sam_project 
in sam_projects:
 
 1284                         sam_projects.append(sam_project)
 
 1285                     cpid = larbatch_posix.readlines(filename2)[0].
strip()
 
 1286                     if not cpid 
in cpids:
 
 1292             if not bad 
and (stage.inputlist !=
'' or stage.inputfile != 
''):
 
 1293                 filename = os.path.join(log_subpath, 
'transferred_uris.list')
 
 1294                 if not larbatch_posix.exists(filename):
 
 1295                     print(
'Could not find file transferred_uris.list')
 
 1298                     lines = larbatch_posix.readlines(filename)
 
 1308                 subdir_split = subdir.split(
'_')
 
 1309                 if len(subdir_split) > 1:
 
 1310                     process = int(subdir_split[1])
 
 1311                     if process 
in processes:
 
 1312                         print(
'Duplicate process number')
 
 1315                         processes.append(process)
 
 1320                 procmap[subdir] = roots
 
 1324                 filesana.extend(subhists)
 
 1328                 nev_tot = nev_tot + nev
 
 1329                 nroot_tot = nroot_tot + len(roots)
 
 1334                 bad_workers.append(subdir)
 
 1339                 print(
'Bad subdirectory %s.' % subdir)
 
 1350     contents = larbatch_posix.listdir(stage.bookdir)
 
 1351     if len(contents) == 0:
 
 1352         print(
'Directory %s may be dead.' % stage.bookdir)
 
 1353         print(
'Returning error status without creating any bookkeeping files.')
 
 1358     filelistname = os.path.join(stage.bookdir, 
'files.list')
 
 1361     eventslistname = os.path.join(stage.bookdir, 
'events.list')
 
 1362     eventslist = 
safeopen(eventslistname)
 
 1364     badfilename = os.path.join(stage.bookdir, 
'bad.list')
 
 1367     missingfilesname = os.path.join(stage.bookdir, 
'missing_files.list')
 
 1368     missingfiles = 
safeopen(missingfilesname)
 
 1370     filesanalistname = os.path.join(stage.bookdir, 
'filesana.list')
 
 1371     filesanalist = 
safeopen(filesanalistname)
 
 1373     urislistname = os.path.join(stage.bookdir, 
'transferred_uris.list')
 
 1382     for s 
in list(procmap.keys()):
 
 1384         for root 
in procmap[s]:
 
 1386             filelist.write(
'%s\n' % root[0])
 
 1387             eventslist.write(
'%s %d\n' % root[:2])
 
 1390                 if stream 
not in streams:
 
 1391                     streamlistname = os.path.join(stage.bookdir, 
'files_%s.list' % stream)
 
 1392                     streams[stream] = 
safeopen(streamlistname)
 
 1393                 streams[stream].
write(
'%s\n' % root[0])
 
 1398     for bad_worker 
in bad_workers:
 
 1399         badfile.write(
'%s\n' % bad_worker)
 
 1405     if stage.inputdef == 
'' and not stage.pubs_output:
 
 1407         if len(input_files) > 0:
 
 1408             missing_files = 
list(set(input_files) - set(uris))
 
 1409             for missing_file 
in missing_files:
 
 1410                 missingfiles.write(
'%s\n' % missing_file)
 
 1413             nmiss = stage.num_jobs - len(procmap)
 
 1414             for n 
in range(nmiss):
 
 1415                 missingfiles.write(
'/dev/null\n')
 
 1420     for hist 
in filesana:
 
 1421         filesanalist.write(
'%s\n' % hist)
 
 1426         urislist.write(
'%s\n' % uri)
 
 1431         print(
"%d processes completed successfully." % nproc)
 
 1432         print(
"%d total good histogram files." % len(filesana))
 
 1434         print(
"%d total good events." % nev_tot)
 
 1435         print(
"%d total good root files." % nroot_tot)
 
 1436         print(
"%d total good histogram files." % len(filesana))
 
 1442         project_utilities.addLayerTwo(filelistname)
 
 1445         project_utilities.addLayerTwo(eventslistname)
 
 1450         missingfiles.write(
'\n')
 
 1451     missingfiles.close()
 
 1452     filesanalist.close()
 
 1453     if len(filesana) == 0:
 
 1454         project_utilities.addLayerTwo(filesanalistname)
 
 1456         urislist.write(
'\n')
 
 1458     for stream 
in list(streams.keys()):
 
 1459         streams[stream].
close()
 
 1463     if stage.inputdef != 
'' and not stage.pubs_input:
 
 1467         sam_projects_filename = os.path.join(stage.bookdir, 
'sam_projects.list')
 
 1468         sam_projects_file = 
safeopen(sam_projects_filename)
 
 1469         for sam_project 
in sam_projects:
 
 1470             sam_projects_file.write(
'%s\n' % sam_project)
 
 1471         sam_projects_file.close()
 
 1472         if len(sam_projects) == 0:
 
 1473             project_utilities.addLayerTwo(sam_projects_filename)
 
 1477         cpids_filename = os.path.join(stage.bookdir, 
'cpids.list')
 
 1478         cpids_file = 
safeopen(cpids_filename)
 
 1480             cpids_file.write(
'%s\n' % cpid)
 
 1483             project_utilities.addLayerTwo(cpids_filename)
 
 1490             cpids_list = cpids_list + 
'%s%s' % (sep, cpid)
 
 1492         if cpids_list != 
'':
 
 1493             dim = 
'consumer_process_id %s and consumed_status consumed' % cpids_list
 
 1495             nconsumed = samweb.countFiles(dimensions=dim)
 
 1501         if cpids_list != 
'':
 
 1502             udim = 
'(defname: %s) minus (%s)' % (stage.inputdef, dim)
 
 1504             udim = 
'defname: %s' % stage.inputdef
 
 1505         nunconsumed = samweb.countFiles(dimensions=udim)
 
 1506         nerror = nerror + nunconsumed
 
 1510         print(
'%d sam projects.' % len(sam_projects))
 
 1511         print(
'%d successful consumer process ids.' % len(cpids))
 
 1512         print(
'%d files consumed.' % nconsumed)
 
 1513         print(
'%d files not consumed.' % nunconsumed)
 
 1517         for sam_project 
in sam_projects:
 
 1518             print(
'\nChecking sam project %s' % sam_project)
 
 1520             url = samweb.findProject(sam_project, project_utilities.get_experiment())
 
 1522                 result = samweb.projectSummary(url)
 
 1528                 if 'processes' in result:
 
 1529                     processes = result[
'processes']
 
 1530                     for process 
in processes:
 
 1532                         if 'status' in process:
 
 1533                             if process[
'status'] == 
'active':
 
 1535                         if 'counts' in process:
 
 1536                             counts = process[
'counts']
 
 1537                             if 'delivered' in counts:
 
 1538                                 nd = nd + counts[
'delivered']
 
 1539                             if 'consumed' in counts:
 
 1540                                 nc = nc + counts[
'consumed']
 
 1541                             if 'failed' in counts:
 
 1542                                 nf = nf + counts[
'failed']
 
 1543                 print(
'Status: %s' % result[
'project_status'])
 
 1544                 print(
'%d total processes' % nproc)
 
 1545                 print(
'%d active processes' % nact)
 
 1546                 print(
'%d files in snapshot' % result[
'files_in_snapshot'])
 
 1547                 print(
'%d files delivered' % (nd + nc))
 
 1548                 print(
'%d files consumed' % nc)
 
 1549                 print(
'%d files failed' % nf)
 
 1554     checkfilename = os.path.join(stage.bookdir, 
'checked')
 
 1555     checkfile = 
safeopen(checkfilename)
 
 1556     checkfile.write(
'\n')
 
 1558     project_utilities.addLayerTwo(checkfilename)
 
 1560     if stage.inputdef == 
'' or stage.pubs_input:
 
 1561         print(
'%d processes with errors.' % nerror)
 
 1562         print(
'%d missing files.' % nmiss)
 
 1564         print(
'%d unconsumed files.' % nerror)
 
 1572     if not ana 
and nroot_tot == 0:
 
 1574     if len(procmap) == 0:
 
 1581     if not larbatch_posix.isdir(stage.outdir):
 
 1582         print(
'Output directory %s does not exist.' % stage.outdir)
 
 1585     if not larbatch_posix.isdir(stage.bookdir):
 
 1586         print(
'Log directory %s does not exist.' % stage.bookdir)
 
 1589     print(
'Checking directory %s' % stage.bookdir)
 
 1598     transferredFiles = []       
 
 1607     for log_subpath, subdirs, files 
in larbatch_posix.walk(stage.bookdir):
 
 1611         if len(subdirs) != 0:
 
 1615         if log_subpath[-6:] == 
'_start' or log_subpath[-5:] == 
'_stop':
 
 1616             filename = os.path.join(log_subpath, 
'sam_project.txt')
 
 1617             if larbatch_posix.exists(filename):
 
 1618                 sam_project = larbatch_posix.readlines(filename)[0].
strip()
 
 1619                 if sam_project != 
'' and not sam_project 
in sam_projects:
 
 1620                     sam_projects.append(sam_project)
 
 1624         print(
'Doing quick check of directory %s.' % log_subpath)
 
 1626         subdir = os.path.relpath(log_subpath, stage.bookdir)
 
 1628         out_subpath = os.path.join(stage.outdir, subdir)
 
 1629         dirok = project_utilities.fast_isdir(log_subpath)
 
 1636         missingfilesname = os.path.join(log_subpath, 
'missing_files.list')
 
 1642             missingfiles = project_utilities.saferead(missingfilesname)
 
 1645             print(
'Cannot open file: %s' % missingfilesname)
 
 1649         if validateOK == 1 
and len(missingfiles) == 0:
 
 1650             print(
'%s exists, but is empty' % missingfilesname)
 
 1655             line = missingfiles[0]
 
 1656             line = line.strip(
'\n')
 
 1657             if( int(line) != 0 ):
 
 1672         if stage.inputdef != 
'':
 
 1674             filename1 = os.path.join(log_subpath, 
'sam_project.txt')
 
 1675             if not larbatch_posix.exists(filename1):
 
 1676                 print(
'Could not find file sam_project.txt')
 
 1679                 sam_project = larbatch_posix.readlines(filename1)[0].
strip()
 
 1680                 if not sam_project 
in sam_projects:
 
 1681                     sam_projects.append(sam_project)
 
 1683             filename2 = os.path.join(log_subpath, 
'cpid.txt')
 
 1684             if not larbatch_posix.exists(filename2):
 
 1685                 print(
'Could not find file cpid.txt')
 
 1688                 cpid = larbatch_posix.readlines(filename2)[0].
strip()
 
 1689                 if not cpid 
in cpids:
 
 1692         filelistsrc = os.path.join(log_subpath, 
'files.list')
 
 1695         if( tmpArray == [ -1 ] ):
 
 1698             goodFiles.extend(tmpArray)
 
 1700         fileanalistsrc = os.path.join(log_subpath, 
'filesana.list')
 
 1703         if( 
not tmpArray == [ -1 ] ):
 
 1704             goodAnaFiles.extend(tmpArray)
 
 1706         eventlistsrc = os.path.join(log_subpath, 
'events.list')
 
 1710         if( tmpArray == [ -1 ] ):
 
 1713             eventLists.extend(tmpArray)
 
 1716         badfilesrc = os.path.join(log_subpath, 
'bad.list')
 
 1722         if( tmpArray == [ -1 ] ):
 
 1725             badLists.extend(tmpArray)
 
 1728         missingfilesrc  = os.path.join(log_subpath, 'missing_files.list') 
 1730         tmpArray = scan_file(missingfilesrc) 
 1732         if( tmpArray == [ -1 ] ): 
 1735             missingLists.extend(tmpArray) 
 1748         urislistsrc = os.path.join(log_subpath, 
'transferred_uris.list')
 
 1753         if( tmpArray == [ -1 ] ):
 
 1756             transferredFiles.extend(tmpArray)
 
 1758         streamList = larbatch_posix.listdir(log_subpath)
 
 1760         for stream 
in streamList:
 
 1761             if( stream[:6] != 
"files_" ):
 
 1763             streamfilesrc = os.path.join(log_subpath, stream)
 
 1766             if( tmpArray == [ -1 ] ):
 
 1769                 if(streamLists.get(stream, 
"empty") == 
"empty" ):
 
 1770                     streamLists[stream] = tmpArray
 
 1772                     streamLists[stream].extend(tmpArray)
 
 1775             goodLogDirs.add(log_subpath)
 
 1777     checkfilename = os.path.join(stage.bookdir, 
'checked')
 
 1778     checkfile = 
safeopen(checkfilename)
 
 1779     checkfile.write(
'\n')
 
 1783     filelistdest = os.path.join(stage.bookdir, 
'files.list')
 
 1784     if larbatch_posix.exists(filelistdest):
 
 1786         larbatch_posix.remove(filelistdest)
 
 1787     if len(goodLogDirs) == 1:
 
 1788         src = 
'%s/files.list' % goodLogDirs.copy().pop()
 
 1790         larbatch_posix.symlink(src, filelistdest)
 
 1794         for goodFile 
in goodFiles:
 
 1796             inputList.write(
"%s\n" % goodFile)
 
 1798     if len(goodFiles) == 0:
 
 1799         project_utilities.addLayerTwo(filelistdest)
 
 1802     fileanalistdest = os.path.join(stage.bookdir, 
'filesana.list')
 
 1803     if larbatch_posix.exists(fileanalistdest):
 
 1805         larbatch_posix.remove(fileanalistdest)
 
 1806     if len(goodLogDirs) == 1:
 
 1807         src = 
'%s/filesana.list' % goodLogDirs.copy().pop()
 
 1809         larbatch_posix.symlink(src, fileanalistdest)
 
 1812         anaList = 
safeopen(fileanalistdest)
 
 1813         for goodAnaFile 
in goodAnaFiles:
 
 1815             anaList.write(
"%s\n" % goodAnaFile)
 
 1817         if len(goodAnaFiles) == 0:
 
 1818             project_utilities.addLayerTwo(fileanalistdest)
 
 1821     eventlistdest = os.path.join(stage.bookdir, 
'events.list')
 
 1822     if larbatch_posix.exists(eventlistdest):
 
 1824         larbatch_posix.remove(eventlistdest)
 
 1825     if len(goodLogDirs) == 1:
 
 1826         src = 
'%s/events.list' % goodLogDirs.copy().pop()
 
 1828         larbatch_posix.symlink(src, eventlistdest)
 
 1831         eventsOutList = 
safeopen(eventlistdest)
 
 1832         for event 
in eventLists:
 
 1834             eventsOutList.write(
"%s\n" % event)
 
 1835         eventsOutList.close()
 
 1836         if len(eventLists) == 0:
 
 1837             project_utilities.addLayerTwo(eventlistdest)
 
 1840     if(len(badLists) > 0):
 
 1841         badlistdest = os.path.join(stage.bookdir, 
'bad.list')
 
 1843         for bad 
in badLists:
 
 1844             badOutList.write(
"%s\n" % bad)
 
 1850     if stage.inputdef == 
'' and not stage.pubs_output:
 
 1852         if len(input_files) > 0:
 
 1853             missing_files = 
list(set(input_files) - set(transferredFiles))
 
 1855     if len(missing_files) > 0:
 
 1856         missinglistdest = os.path.join(stage.bookdir, 
'missing_files.list')
 
 1857         missingOutList = 
safeopen(missinglistdest)
 
 1858         for missing 
in missing_files:
 
 1859             missingOutList.write(
"%s\n" % missing)
 
 1860         missingOutList.close()
 
 1864     urilistdest = os.path.join(stage.bookdir, 
'transferred_uris.list')
 
 1865     if larbatch_posix.exists(urilistdest):
 
 1867         larbatch_posix.remove(urilistdest)
 
 1868     if len(goodLogDirs) == 1 
and len(transferredFiles) > 0:
 
 1869         src = 
'%s/transferred_uris.list' % goodLogDirs.copy().pop()
 
 1871         larbatch_posix.symlink(src, urilistdest)
 
 1875         for uri 
in transferredFiles:
 
 1877             uriOutList.write(
"%s\n" % uri)
 
 1879         if len(transferredFiles) == 0:
 
 1880             project_utilities.addLayerTwo(urilistdest)
 
 1882     if stage.inputdef != 
'':
 
 1883         samprojectdest = os.path.join(stage.bookdir, 
'sam_projects.list')
 
 1884         if larbatch_posix.exists(samprojectdest):
 
 1886             larbatch_posix.remove(samprojectdest)
 
 1887         if len(goodLogDirs) == 1:
 
 1888             src = 
'%s/sam_project.txt' % goodLogDirs.copy().pop()
 
 1890             larbatch_posix.symlink(src, samprojectdest)
 
 1893             samprojectfile = 
safeopen(samprojectdest)
 
 1894             for sam 
in sam_projects:
 
 1895                 samprojectfile.write(
"%s\n" % sam)
 
 1896             samprojectfile.close()
 
 1897             if len(sam_projects) == 0:
 
 1898                 project_utilities.addLayerTwo(samprojectdest)
 
 1900         cpiddest = os.path.join(stage.bookdir, 
'cpids.list')
 
 1901         if larbatch_posix.exists(cpiddest):
 
 1903             larbatch_posix.remove(cpiddest)
 
 1904         if len(goodLogDirs) == 1:
 
 1905             src = 
'%s/cpid.txt' % goodLogDirs.copy().pop()
 
 1907             larbatch_posix.symlink(src, cpiddest)
 
 1912                 cpidfile.write(
"%s \n" % cp)
 
 1915                 project_utilities.addLayerTwo(cpiddest)
 
 1918     for stream 
in streamLists:
 
 1919         streamdest = os.path.join(stage.bookdir, stream)
 
 1920         if larbatch_posix.exists(streamdest):
 
 1922             larbatch_posix.remove(streamdest)
 
 1923         if len(goodLogDirs) == 1:
 
 1924             src = 
'%s/%s' % (goodLogDirs.copy().pop(), stream)
 
 1926             larbatch_posix.symlink(src, streamdest)
 
 1929             streamOutList = 
safeopen(streamdest)
 
 1930             for line 
in streamLists[stream]:
 
 1931                 streamOutList.write(
"%s\n" % line)
 
 1932             streamOutList.close()
 
 1933             if len(streamLists[stream]) == 0:
 
 1934                 project_utilities.addLayerTwo(streamdest)
 
 1940     print(
'Number of errors = %d' % nErrors)
 
 1965     for dirpath, dirnames, filenames 
in larbatch_posix.walk(stage.bookdir):
 
 1966         for filename 
in filenames:
 
 1967             if filename == 
'env.txt':
 
 1978                 envpath = os.path.join(dirpath, filename)
 
 1979                 vars = larbatch_posix.readlines(envpath)
 
 1984                     varsplit = var.split(
'=', 1)
 
 1985                     name = varsplit[0].
strip()
 
 1986                     if name == 
'JOBSUBPARENTJOBID':
 
 1987                         logid = varsplit[1].
strip()
 
 1992                         logsplit = logid.split(
'@', 1)
 
 1993                         cluster_process = logsplit[0]
 
 1994                         server = logsplit[1]
 
 1995                         cluster = cluster_process.split(
'.', 1)[0]
 
 1996                         logid = cluster + 
'.0' + 
'@' + server
 
 1997                         logids.append(logid)
 
 2004                         varsplit = var.split(
'=', 1)
 
 2005                         name = varsplit[0].
strip()
 
 2006                         if name == 
'JOBSUBJOBID':
 
 2007                             logid = varsplit[1].
strip()
 
 2012                             logsplit = logid.split(
'@', 1)
 
 2013                             cluster_process = logsplit[0]
 
 2014                             server = logsplit[1]
 
 2015                             cluster = cluster_process.split(
'.', 1)[0]
 
 2016                             logid = cluster + 
'.0' + 
'@' + server
 
 2017                             logids.append(logid)
 
 2026         logdir = os.path.join(stage.bookdir, 
'log')
 
 2027         if larbatch_posix.exists(logdir):
 
 2028             larbatch_posix.rmtree(logdir)
 
 2029         larbatch_posix.mkdir(logdir)
 
 2033         for logid 
in set(logids):
 
 2039             print(
'Fetching log files for id %s' % logid)
 
 2040             command = [
'jobsub_fetchlog']
 
 2041             if project.server != 
'-' and project.server != 
'':
 
 2042                 command.append(
'--jobsub-server=%s' % project.server)
 
 2043             command.append(
'--jobid=%s' % logid)
 
 2044             command.append(
'--dest-dir=%s' % logdir)
 
 2045             jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
 2046             jobout, joberr = jobinfo.communicate()
 
 2051                 raise JobsubError(command, rc, jobout, joberr)
 
 2063         print(
'Failed to fetch log files.')
 
 2084     listname = 
'files.list' 
 2086         listname = 
'filesana.list' 
 2087     fnlist = os.path.join(logdir, listname)
 
 2088     if larbatch_posix.exists(fnlist):
 
 2089         roots = larbatch_posix.readlines(fnlist)
 
 2091         raise RuntimeError(
'No %s file found %s, run project.py --check' % (listname, fnlist))
 
 2095         fn = os.path.basename(path)
 
 2096         dirpath = os.path.dirname(path)
 
 2097         dirname = os.path.relpath(dirpath, outdir)
 
 2101         has_metadata = 
False 
 2103             md = samweb.getMetadata(filenameorid=fn)
 
 2105         except samweb_cli.exceptions.FileNotFound:
 
 2111             print(
'Metadata OK: %s' % fn)
 
 2114                 print(
'Declaring: %s' % fn)
 
 2115                 jsonfile = os.path.join(logdir, os.path.join(dirname, fn)) + 
'.json' 
 2117                 if larbatch_posix.exists(jsonfile):
 
 2118                     mdlines = larbatch_posix.readlines(jsonfile)
 
 2120                     for line 
in mdlines:
 
 2121                         mdtext = mdtext + line
 
 2123                         md = json.loads(mdtext)
 
 2131                     expSpecificMetaData = expMetaData(os.environ[
'SAM_EXPERIMENT'],larbatch_posix.root_stream(path))
 
 2132                     md = expSpecificMetaData.getmetadata(mdjson)
 
 2134                     project_utilities.test_kca()
 
 2140                         samweb.declareFile(md=md)
 
 2145                         print(
'SAM declare failed.')
 
 2149                     print(
'No sam metadata found for %s.' % fn)
 
 2151                 print(
'Not declared: %s' % fn)
 
 2166     result = samweb.listFilesSummary(dimensions=dim)
 
 2167     for key 
in list(result.keys()):
 
 2168         print(
'%s: %s' % (key, result[key]))
 
 2195         desc = samweb.descDefinition(defname=defname)
 
 2197     except samweb_cli.exceptions.DefinitionNotFound:
 
 2203         print(
'Definition already exists: %s' % defname)
 
 2206             print(
'Creating definition %s.' % defname)
 
 2207             project_utilities.test_kca()
 
 2208             samweb.createDefinition(defname=defname, dims=dim)
 
 2211             print(
'Definition should be created: %s' % defname)
 
 2225     result = samweb.listFilesSummary(defname=defname)
 
 2226     for key 
in list(result.keys()):
 
 2227         print(
'%s: %s' % (key, result[key]))
 
 2246         desc = samweb.descDefinition(defname=defname)
 
 2248     except samweb_cli.exceptions.DefinitionNotFound:
 
 2254         print(
'Deleting definition: %s' % defname)
 
 2255         project_utilities.test_kca()
 
 2256         samweb.deleteDefinition(defname=defname)
 
 2258         print(
'No such definition: %s' % defname)
 
 2268         print(
'Adding disk locations.')
 
 2270         print(
'Cleaning disk locations.')
 
 2272         print(
'Removing disk locations.')
 
 2274         print(
'Uploading to FTS.')
 
 2276         print(
'Checking disk locations.')
 
 2284     filelist = samweb.listFiles(dimensions=dim, stream=
False)
 
 2289     for filename 
in filelist:
 
 2290         disk_dict[filename] = []
 
 2291     for out_subpath, subdirs, files 
in larbatch_posix.walk(outdir):
 
 2295         if len(subdirs) != 0:
 
 2300                 disk_dict[fn].append(out_subpath)
 
 2304     for filename 
in filelist:
 
 2305         disk_locs = disk_dict[filename]
 
 2306         sam_locs = samweb.locateFile(filenameorid=filename)
 
 2307         if len(sam_locs) == 0 
and not upload:
 
 2308             print(
'No location: %s' % filename)
 
 2315         for disk_loc 
in disk_locs:
 
 2317             for sam_loc 
in sam_locs:
 
 2318                 if sam_loc[
'location_type'] == 
'disk':
 
 2319                     if disk_loc == sam_loc[
'location'].split(
':')[-1]:
 
 2323                 locs_to_add.append(disk_loc)
 
 2332         for sam_loc 
in sam_locs:
 
 2333             if sam_loc[
'location_type'] == 
'disk':
 
 2338                     locs_to_remove.append(sam_loc[
'location'])
 
 2346                     local_path = os.path.join(sam_loc[
'location'].split(
':')[-1], filename)
 
 2347                     if not larbatch_posix.exists(local_path):
 
 2348                         locs_to_remove.append(sam_loc[
'location'])
 
 2356         should_upload = 
False 
 2357         if upload 
and len(disk_locs) > 0:
 
 2358             should_upload = 
True 
 2359             for sam_loc 
in sam_locs:
 
 2360                 if sam_loc[
'location_type'] == 
'tape':
 
 2361                     should_upload = 
False 
 2364                 dropbox = project_utilities.get_dropbox(filename)
 
 2365                 if not larbatch_posix.exists(dropbox):
 
 2366                     print(
'Making dropbox directory %s.' % dropbox)
 
 2367                     larbatch_posix.makedirs(dropbox)
 
 2368                 locs_to_upload[disk_locs[0]] = dropbox
 
 2372         for loc 
in locs_to_add:
 
 2373             node = project_utilities.get_bluearc_server()
 
 2374             if loc[0:6] == 
'/pnfs/':
 
 2375                 node = project_utilities.get_dcache_server()
 
 2376             loc = node + loc.split(
':')[-1]
 
 2378                 print(
'Adding location: %s.' % loc)
 
 2379                 project_utilities.test_kca()
 
 2380                 samweb.addFileLocation(filenameorid=filename, location=loc)
 
 2382                 print(
'Can add location: %s.' % loc)
 
 2384         for loc 
in locs_to_remove:
 
 2386                 print(
'Removing location: %s.' % loc)
 
 2387                 project_utilities.test_kca()
 
 2388                 samweb.removeFileLocation(filenameorid=filename, location=loc)
 
 2390                 print(
'Should remove location: %s.' % loc)
 
 2392         for loc 
in list(locs_to_upload.keys()):
 
 2393             dropbox = locs_to_upload[loc]
 
 2397             if not larbatch_posix.isdir(dropbox):
 
 2398                 print(
'Dropbox directory %s does not exist.' % dropbox)
 
 2403                 dropbox_filename = os.path.join(dropbox, filename)
 
 2404                 if larbatch_posix.exists(dropbox_filename):
 
 2405                     print(
'File %s already exists in dropbox %s.' % (filename, dropbox))
 
 2410                     loc_filename = os.path.join(loc, filename)
 
 2414                     if project_utilities.mountpoint(loc_filename) == \
 
 2415                             project_utilities.mountpoint(dropbox_filename):
 
 2416                         print(
'Symlinking %s to dropbox directory %s.' % (filename, dropbox))
 
 2417                         relpath = os.path.relpath(os.path.realpath(loc_filename), dropbox)
 
 2418                         print(
'relpath=',relpath)
 
 2419                         print(
'dropbox_filename=',dropbox_filename)
 
 2420                         larbatch_posix.symlink(relpath, dropbox_filename)
 
 2423                         print(
'Copying %s to dropbox directory %s.' % (filename, dropbox))
 
 2424                         larbatch_posix.copy(loc_filename, dropbox_filename)
 
 2446     filelist = samweb.listFiles(dimensions=dim, stream=
True)
 
 2449             filename = next(filelist)
 
 2450         except StopIteration:
 
 2460         sam_locs = samweb.locateFile(filenameorid=filename)
 
 2461         for sam_loc 
in sam_locs:
 
 2462             if sam_loc[
'location_type'] == 
'tape':
 
 2467             print(
'On tape: %s' % filename)
 
 2471             print(
'Not on tape: %s' % filename)
 
 2473     print(
'%d files.' % ntot)
 
 2474     print(
'%d files need to be store on tape.' % nbad)
 
 2494     tmpdir = tempfile.mkdtemp()
 
 2498     tmpworkdir = tempfile.mkdtemp()
 
 2504     jobsub_workdir_files_args = []
 
 2508     input_list_name = 
'' 
 2509     if stage.inputlist != 
'':
 
 2510         input_list_name = os.path.basename(stage.inputlist)
 
 2511         work_list_name = os.path.join(tmpworkdir, input_list_name)
 
 2512         if stage.inputlist != work_list_name:
 
 2513             input_files = larbatch_posix.readlines(stage.inputlist)
 
 2514             print(
'Making input list.')
 
 2515             work_list = 
safeopen(work_list_name)
 
 2516             for input_file 
in input_files:
 
 2517                 print(
'Adding input file %s' % input_file)
 
 2518                 work_list.write(
'%s\n' % input_file.strip())
 
 2520             print(
'Done making input list.')
 
 2524     fcls = project.get_fcl(stage.fclname)
 
 2529       workfcl = os.path.join(tmpworkdir, os.path.basename(fcl))
 
 2530       if os.path.abspath(fcl) != os.path.abspath(workfcl):
 
 2531         larbatch_posix.copy(fcl, workfcl)
 
 2539     wrapper_fcl_name = os.path.join(tmpworkdir, 
'wrapper.fcl')
 
 2540     wrapper_fcl = 
safeopen(wrapper_fcl_name)
 
 2542     original_project_name = project.name
 
 2543     original_stage_name = stage.name
 
 2544     original_project_version = project.version
 
 2547       wrapper_fcl.write(
'#---STAGE %d\n' % stageNum)
 
 2548       wrapper_fcl.write(
'#include "%s"\n' % os.path.basename(fcl))
 
 2549       wrapper_fcl.write(
'\n')
 
 2554       xml_has_metadata = project.file_type != 
'' or \
 
 2555                        project.run_type != 
'' 
 2556       if xml_has_metadata:
 
 2560         if project.release_tag != 
'':
 
 2561             wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "%s"\n' % \
 
 2562                                   project.release_tag)
 
 2564             wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "test"\n')
 
 2565         if project.file_type:
 
 2566             wrapper_fcl.write(
'services.FileCatalogMetadata.fileType: "%s"\n' % \
 
 2568         if project.run_type:
 
 2569             wrapper_fcl.write(
'services.FileCatalogMetadata.runType: "%s"\n' % \
 
 2575         if stageNum < len(stage.project_name) 
and stage.project_name[stageNum] != 
'':
 
 2576             project.name = stage.project_name[stageNum]
 
 2577         if stageNum < len(stage.stage_name) 
and stage.stage_name[stageNum] != 
'':
 
 2578             stage.name = stage.stage_name[stageNum]
 
 2579         if stageNum < len(stage.project_version) 
and stage.project_version[stageNum] != 
'':
 
 2580             project.version = stage.project_version[stageNum]
 
 2581         sam_metadata = project_utilities.get_sam_metadata(project, stage)
 
 2583             wrapper_fcl.write(sam_metadata)
 
 2584         project.name = original_project_name
 
 2585         stage.name = original_stage_name
 
 2586         project.version = original_project_version
 
 2591       if (
not stage.pubs_input 
and stage.pubs_output) 
or stage.output_run:
 
 2592         wrapper_fcl.write(
'source.firstRun: %d\n' % stage.output_run)
 
 2598       if stage.maxfluxfilemb != 0 
and stageNum == 0:
 
 2599          wrapper_fcl.write(
'physics.producers.generator.FluxCopyMethod: "IFDH"\n')
 
 2600          wrapper_fcl.write(
'physics.producers.generator.MaxFluxFileMB: %d\n' % stage.maxfluxfilemb)
 
 2601       wrapper_fcl.write(
'#---END_STAGE\n')
 
 2602       stageNum = 1 + stageNum
 
 2611     abssetupscript = project_utilities.get_setup_script_path()
 
 2613     if not abssetupscript.startswith(
'/cvmfs/'):
 
 2614         setupscript = os.path.join(stage.workdir,
'setup_experiment.sh')
 
 2615         larbatch_posix.copy(abssetupscript, setupscript)
 
 2616         jobsub_workdir_files_args.extend([
'-f', setupscript])
 
 2621     if stage.batchname != 
'':
 
 2622         workname = stage.batchname
 
 2624         workname = 
'%s-%s-%s' % (stage.name, project.name, project.release_tag)
 
 2625     workname = workname + os.path.splitext(stage.script)[1]
 
 2627     workscript = os.path.join(tmpdir, workname)
 
 2628     if stage.script != workscript:
 
 2629         larbatch_posix.copy(stage.script, workscript)
 
 2633     workstartscript = 
'' 
 2635     if stage.start_script != 
'':
 
 2636         workstartname = 
'start-%s' % workname
 
 2638         workstartscript = os.path.join(tmpdir, workstartname)
 
 2639         if stage.start_script != workstartscript:
 
 2640             larbatch_posix.copy(stage.start_script, workstartscript)
 
 2646     if stage.stop_script != 
'':
 
 2647         workstopname = 
'stop-%s' % workname
 
 2649         workstopscript = os.path.join(tmpdir, workstopname)
 
 2650         if stage.stop_script != workstopscript:
 
 2651             larbatch_posix.copy(stage.stop_script, workstopscript)
 
 2655     for init_script 
in stage.init_script:
 
 2656         if init_script != 
'':
 
 2657             if not larbatch_posix.exists(init_script):
 
 2658                 raise RuntimeError(
'Worker initialization script %s does not exist.\n' % \
 
 2660             work_init_script = os.path.join(tmpworkdir, os.path.basename(init_script))
 
 2661             if init_script != work_init_script:
 
 2662                 larbatch_posix.copy(init_script, work_init_script)
 
 2666     n = len(stage.init_script)
 
 2668         stage.init_script = 
'' 
 2670         stage.init_script = stage.init_script[0]
 
 2675         work_init_wrapper = os.path.join(tmpworkdir, 
'init_wrapper.sh')
 
 2676         f = 
open(work_init_wrapper, 
'w')
 
 2677         f.write(
'#! /bin/bash\n')
 
 2678         for init_script 
in stage.init_script:
 
 2680             f.write(
'echo "Executing %s"\n' % os.path.basename(init_script))
 
 2681             f.write(
'./%s\n' % os.path.basename(init_script))
 
 2682             f.write(
'status=$?\n')
 
 2683             f.write(
'echo "%s finished with status $status"\n' % os.path.basename(init_script))
 
 2684             f.write(
'if [ $status -ne 0 ]; then\n')
 
 2685             f.write(
'  exit $status\n')
 
 2688         f.write(
'echo "Done executing initialization scripts."\n')
 
 2690         stage.init_script = work_init_wrapper
 
 2694     for init_source 
in stage.init_source:
 
 2695         if init_source != 
'':
 
 2696             if not larbatch_posix.exists(init_source):
 
 2697                 raise RuntimeError(
'Worker initialization source script %s does not exist.\n' % \
 
 2699         work_init_source = os.path.join(tmpworkdir, os.path.basename(init_source))
 
 2700         if init_source != work_init_source:
 
 2701             larbatch_posix.copy(init_source, work_init_source)
 
 2705     n = len(stage.init_source)
 
 2707         stage.init_source = 
'' 
 2709         stage.init_source = stage.init_source[0]
 
 2715         work_init_source_wrapper = os.path.join(tmpworkdir, 
'init_source_wrapper.sh')
 
 2716         f = 
open(work_init_source_wrapper, 
'w')
 
 2717         for init_source 
in stage.init_source:
 
 2719             f.write(
'echo "Sourcing %s"\n' % os.path.basename(init_source))
 
 2720             f.write(
'source %s\n' % os.path.basename(init_source))
 
 2722         f.write(
'echo "Done sourcing initialization scripts."\n')
 
 2724         stage.init_source = work_init_source_wrapper
 
 2728     for end_script 
in stage.end_script:
 
 2729         if end_script != 
'':
 
 2730             if not larbatch_posix.exists(end_script):
 
 2731                 raise RuntimeError(
'Worker end-of-job script %s does not exist.\n' % end_script)
 
 2732             work_end_script = os.path.join(tmpworkdir, os.path.basename(end_script))
 
 2733             if end_script != work_end_script:
 
 2734                 larbatch_posix.copy(end_script, work_end_script)
 
 2738     n = len(stage.end_script)
 
 2740         stage.end_script = 
'' 
 2742         stage.end_script = stage.end_script[0]
 
 2747         work_end_wrapper = os.path.join(tmpworkdir, 
'end_wrapper.sh')
 
 2748         f = 
open(work_end_wrapper, 
'w')
 
 2749         f.write(
'#! /bin/bash\n')
 
 2750         for end_script 
in stage.end_script:
 
 2752             f.write(
'echo "Executing %s"\n' % os.path.basename(end_script))
 
 2753             f.write(
'./%s\n' % os.path.basename(end_script))
 
 2754             f.write(
'status=$?\n')
 
 2755             f.write(
'echo "%s finished with status $status"\n' % os.path.basename(end_script))
 
 2756             f.write(
'if [ $status -ne 0 ]; then\n')
 
 2757             f.write(
'  exit $status\n')
 
 2760         f.write(
'echo "Done executing finalization scripts."\n')
 
 2762         stage.end_script = work_end_wrapper
 
 2766     for istage 
in stage.mid_source:
 
 2767         for mid_source 
in stage.mid_source[istage]:
 
 2768             if mid_source != 
'':
 
 2769                 if not larbatch_posix.exists(mid_source):
 
 2770                     raise RuntimeError(
'Worker midstage initialization source script %s does not exist.\n' % mid_source)
 
 2771                 work_mid_source = os.path.join(tmpworkdir, os.path.basename(mid_source))
 
 2772                 if mid_source != work_mid_source:
 
 2773                     larbatch_posix.copy(mid_source, work_mid_source)
 
 2779     if len(stage.mid_source) > 0:
 
 2780         work_mid_source_wrapper = os.path.join(tmpworkdir, 
'mid_source_wrapper.sh')
 
 2781         f = 
open(work_mid_source_wrapper, 
'w')
 
 2782         for istage 
in stage.mid_source:
 
 2783             for mid_source 
in stage.mid_source[istage]:
 
 2784                 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
 
 2786                 f.write(
'  echo "Sourcing %s"\n' % os.path.basename(mid_source))
 
 2787                 f.write(
'  source %s\n' % os.path.basename(mid_source))
 
 2790         f.write(
'echo "Done sourcing midstage source initialization scripts for stage $stage."\n')
 
 2792         stage.mid_source = work_mid_source_wrapper
 
 2794         stage.mid_source = 
'' 
 2798     for istage 
in stage.mid_script:
 
 2799         for mid_script 
in stage.mid_script[istage]:
 
 2800             if mid_script != 
'':
 
 2801                 if not larbatch_posix.exists(mid_script):
 
 2802                     raise RuntimeError(
'Worker midstage finalization script %s does not exist.\n' % mid_script)
 
 2803                 work_mid_script = os.path.join(tmpworkdir, os.path.basename(mid_script))
 
 2804                 if mid_script != work_mid_script:
 
 2805                     larbatch_posix.copy(mid_script, work_mid_script)
 
 2810     if len(stage.mid_script) > 0:
 
 2811         work_mid_wrapper = os.path.join(tmpworkdir, 
'mid_wrapper.sh')
 
 2812         f = 
open(work_mid_wrapper, 
'w')
 
 2813         f.write(
'#! /bin/bash\n')
 
 2814         f.write(
'stage=$1\n')
 
 2815         for istage 
in stage.mid_script:
 
 2816             for mid_script 
in stage.mid_script[istage]:
 
 2817                 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
 
 2819                 f.write(
'  echo "Executing %s"\n' % os.path.basename(mid_script))
 
 2820                 f.write(
'  ./%s\n' % os.path.basename(mid_script))
 
 2821                 f.write(
'  status=$?\n')
 
 2822                 f.write(
'  echo "%s finished with status $status"\n' % os.path.basename(mid_script))
 
 2823                 f.write(
'  if [ $status -ne 0 ]; then\n')
 
 2824                 f.write(
'    exit $status\n')
 
 2828         f.write(
'echo "Done executing midstage finalization scripts for stage $stage."\n')
 
 2830         stage.mid_script = work_mid_wrapper
 
 2832         stage.mid_script = 
'' 
 2836     helpers = (
'root_metadata.py',
 
 2839                'validate_in_job.py',
 
 2844     for helper 
in helpers:
 
 2848         jobinfo = subprocess.Popen([
'which', helper],
 
 2849                                    stdout=subprocess.PIPE,
 
 2850                                    stderr=subprocess.PIPE)
 
 2851         jobout, joberr = jobinfo.communicate()
 
 2855         helper_path = jobout.splitlines()[0].
strip()
 
 2857             work_helper = os.path.join(tmpworkdir, helper)
 
 2858             if helper_path != work_helper:
 
 2859                 larbatch_posix.copy(helper_path, work_helper)
 
 2861             print(
'Helper script %s not found.' % helper)
 
 2866     helper_modules = (
'larbatch_posix',
 
 2867                       'project_utilities',
 
 2868                       'larbatch_utilities',
 
 2869                       'experiment_utilities',
 
 2872     for helper_module 
in helper_modules:
 
 2876         jobinfo = subprocess.Popen([
'python'],
 
 2877                                    stdin=subprocess.PIPE,
 
 2878                                    stdout=subprocess.PIPE,
 
 2879                                    stderr=subprocess.PIPE)
 
 2880         cmd = 
'import %s\nprint(%s.__file__)\n' % (helper_module, helper_module)
 
 2882         jobout, joberr = jobinfo.communicate()
 
 2886         helper_path = jobout.splitlines()[-1].
strip()
 
 2889             work_helper = os.path.join(tmpworkdir, os.path.basename(helper_path))
 
 2890             if helper_path != work_helper:
 
 2891                 larbatch_posix.copy(helper_path, work_helper)
 
 2893             print(
'Helper python module %s not found.' % helper_module)
 
 2900         checked_file = os.path.join(stage.bookdir, 
'checked')
 
 2901         if not larbatch_posix.exists(checked_file):
 
 2902             raise RuntimeError(
'Wait for any running jobs to finish and run project.py --check')
 
 2907         bad_filename = os.path.join(stage.bookdir, 
'bad.list')
 
 2908         if larbatch_posix.exists(bad_filename):
 
 2909             lines = larbatch_posix.readlines(bad_filename)
 
 2911                 bad_subdir = line.strip()
 
 2912                 if bad_subdir != 
'':
 
 2913                     bad_path = os.path.join(stage.outdir, bad_subdir)
 
 2914                     if larbatch_posix.exists(bad_path):
 
 2915                         print(
'Deleting %s' % bad_path)
 
 2916                         larbatch_posix.rmtree(bad_path)
 
 2917                     bad_path = os.path.join(stage.logdir, bad_subdir)
 
 2918                     if larbatch_posix.exists(bad_path):
 
 2919                         print(
'Deleting %s' % bad_path)
 
 2920                         larbatch_posix.rmtree(bad_path)
 
 2921                     bad_path = os.path.join(stage.bookdir, bad_subdir)
 
 2922                     if larbatch_posix.exists(bad_path):
 
 2923                         print(
'Deleting %s' % bad_path)
 
 2924                         larbatch_posix.rmtree(bad_path)
 
 2931         if stage.inputdef == 
'':
 
 2932             missing_filename = os.path.join(stage.bookdir, 
'missing_files.list')
 
 2933             if larbatch_posix.exists(missing_filename):
 
 2934                 lines = larbatch_posix.readlines(missing_filename)
 
 2936                     words = line.split()
 
 2938                         missing_files.append(words[0])
 
 2939             makeup_count = len(missing_files)
 
 2940             print(
'Makeup list contains %d files.' % makeup_count)
 
 2942         if input_list_name != 
'':
 
 2943             work_list_name = os.path.join(tmpworkdir, input_list_name)
 
 2944             if larbatch_posix.exists(work_list_name):
 
 2945                 larbatch_posix.remove(work_list_name)
 
 2946             work_list = 
safeopen(work_list_name)
 
 2947             for missing_file 
in missing_files:
 
 2948                 work_list.write(
'%s\n' % missing_file)
 
 2955         if stage.inputdef == 
'' and stage.inputfile == 
'' and stage.inputlist == 
'':
 
 2956             procs = set(range(stage.num_jobs))
 
 2961             output_files = os.path.join(stage.bookdir, 
'files.list')
 
 2962             if larbatch_posix.exists(output_files):
 
 2963                 lines = larbatch_posix.readlines(output_files)
 
 2965                     dir = os.path.basename(os.path.dirname(line))
 
 2966                     dir_parts = dir.split(
'_')
 
 2967                     if len(dir_parts) > 1:
 
 2968                         proc = int(dir_parts[1])
 
 2971                 if len(procs) != makeup_count:
 
 2972                     raise RuntimeError(
'Makeup process list has different length than makeup count.')
 
 2977                     procmap = 
'procmap.txt' 
 2978                     procmap_path = os.path.join(tmpworkdir, procmap)
 
 2979                     procmap_file = 
safeopen(procmap_path)
 
 2981                         procmap_file.write(
'%d\n' % proc)
 
 2982                     procmap_file.close()
 
 2991         cpids_filename = os.path.join(stage.bookdir, 
'cpids.list')
 
 2992         if larbatch_posix.exists(cpids_filename):
 
 2993             cpids_files = larbatch_posix.readlines(cpids_filename)
 
 2994             for line 
in cpids_files:
 
 2995                 cpids.append(line.strip())
 
 3001             project_utilities.test_kca()
 
 3002             makeup_defname = samweb.makeProjectName(stage.inputdef) + 
'_makeup' 
 3009                 cpids_list = cpids_list + 
'%s%s' % (sep, cpid)
 
 3014             dim = 
'(defname: %s) minus (consumer_process_id %s and consumed_status consumed)' % (stage.inputdef, cpids_list)
 
 3018             print(
'Creating makeup sam dataset definition %s' % makeup_defname)
 
 3019             project_utilities.test_kca()
 
 3020             samweb.createDefinition(defname=makeup_defname, dims=dim)
 
 3021             makeup_count = samweb.countFiles(defname=makeup_defname)
 
 3022             print(
'Makeup dataset contains %d files.' % makeup_count)
 
 3026     tmptar = 
'%s/work.tar' % tmpworkdir
 
 3027     jobinfo = subprocess.Popen([
'tar',
'-cf', tmptar, 
'-C', tmpworkdir,
 
 3028                                 '--mtime=2018-01-01',
 
 3029                                 '--exclude=work.tar', 
'.'],
 
 3030                                stdout=subprocess.PIPE,
 
 3031                                stderr=subprocess.PIPE)
 
 3032     jobout, joberr = jobinfo.communicate()
 
 3035         raise RuntimeError(
'Failed to create work tarball in %s' % tmpworkdir)
 
 3039     hasher = hashlib.md5()
 
 3040     f = 
open(tmptar, 
'rb')
 
 3045     hash = hasher.hexdigest()
 
 3052     hashtar = 
'%s/work%s.tar' % (stage.workdir, hash)
 
 3053     if not larbatch_posix.exists(hashtar):
 
 3054         larbatch_posix.copy(tmptar, hashtar)
 
 3055     jobsub_workdir_files_args.extend([
'-f', hashtar])
 
 3062     inputdef = stage.inputdef
 
 3063     if makeup 
and makeup_defname != 
'':
 
 3064         inputdef = makeup_defname
 
 3071         project_utilities.test_kca()
 
 3072         prjname = samweb.makeProjectName(inputdef)
 
 3077     if stage.mixinputdef != 
'':
 
 3079         project_utilities.test_kca()
 
 3080         mixprjname = 
'mix_%s' % samweb.makeProjectName(stage.mixinputdef)
 
 3085     if prjname != 
'' and stage.prestart != 0:
 
 3086         ok = project_utilities.start_project(inputdef, prjname,
 
 3087                                              stage.num_jobs * stage.max_files_per_job,
 
 3088                                              stage.recur, stage.filelistdef)
 
 3090             print(
'Failed to start project.')
 
 3096     if mixprjname != 
'' and prj_started:
 
 3097         ok = project_utilities.start_project(stage.mixinputdef, mixprjname, 0, 0, stage.filelistdef)
 
 3099             print(
'Failed to start mix project.')
 
 3104     role = project_utilities.get_role()
 
 3105     if project.role != 
'':
 
 3110     command = [
'jobsub_submit']
 
 3115     command.append(
'--group=%s' % project_utilities.get_experiment())
 
 3116     command.append(
'--role=%s' % role)
 
 3117     command.extend(jobsub_workdir_files_args)
 
 3118     if project.server != 
'-' and project.server != 
'':
 
 3119         command.append(
'--jobsub-server=%s' % project.server)
 
 3120     if stage.resource != 
'':
 
 3121         command.append(
'--resource-provides=usage_model=%s' % stage.resource)
 
 3122     elif project.resource != 
'':
 
 3123         command.append(
'--resource-provides=usage_model=%s' % project.resource)
 
 3124     if stage.lines != 
'':
 
 3125         command.append(
'--lines=%s' % stage.lines)
 
 3126     elif project.lines != 
'':
 
 3127         command.append(
'--lines=%s' % project.lines)
 
 3128     if stage.site != 
'':
 
 3129         command.append(
'--site=%s' % stage.site)
 
 3130     if stage.blacklist != 
'':
 
 3131         command.append(
'--blacklist=%s' % stage.blacklist)
 
 3133         command.append(
'--cpu=%d' % stage.cpu)
 
 3134     if stage.disk != 
'':
 
 3135         command.append(
'--disk=%s' % stage.disk)
 
 3136     if stage.memory != 0:
 
 3137         command.append(
'--memory=%d' % stage.memory)
 
 3138     if project.os != 
'':
 
 3139         if stage.singularity == 0:
 
 3140             command.append(
'--OS=%s' % project.os)
 
 3142             p = project_utilities.get_singularity(project.os)
 
 3144                 if (stage.num_jobs > 1 
or project.force_dag) 
and \
 
 3145                    (inputdef != 
'' or stage.mixinputdef != 
'') :
 
 3146                     command.append(
r"""--lines='+SingularityImage=\"%s\"'""" % p)
 
 3148                     command.append(
r"""--lines='+SingularityImage="%s"'""" % p)
 
 3150                 raise RuntimeError(
'No singularity image found for %s' % project.os)
 
 3151     if not stage.pubs_output:
 
 3153             command_njobs = stage.num_jobs
 
 3154             command.extend([
'-N', 
'%d' % command_njobs])
 
 3156             command_njobs = min(makeup_count, stage.num_jobs)
 
 3157             command.extend([
'-N', 
'%d' % command_njobs])
 
 3159         if stage.inputdef != 
'':
 
 3160             command_njobs = stage.num_jobs
 
 3162             command_njobs = stage.num_jobs
 
 3163             command.extend([
'-N', 
'%d' % command_njobs])
 
 3164     if stage.jobsub != 
'':
 
 3165         for word 
in stage.jobsub.split():
 
 3166             command.append(word)
 
 3167     opt = project_utilities.default_jobsub_submit_options()
 
 3169         for word 
in opt.split():
 
 3170             command.append(word)
 
 3171     if stage.cvmfs != 0:
 
 3172         command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
 
 3173     if stage.stash != 0:
 
 3174         command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
 
 3175     if stage.singularity != 0:
 
 3176         command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
 
 3180     workurl = 
"file://%s" % workscript
 
 3181     command.append(workurl)
 
 3186     if stage.max_files_per_job != 0:
 
 3187         command_max_files_per_job = stage.max_files_per_job
 
 3188         command.extend([
'--nfile', 
'%d' % command_max_files_per_job])
 
 3193     command.extend([
' --group', project_utilities.get_experiment()])
 
 3194     command.extend([
' -g'])
 
 3195     command.extend([
' -c', 
'wrapper.fcl'])
 
 3196     command.extend([
' --ups', 
','.
join(project.ups)])
 
 3197     if project.release_tag != 
'':
 
 3198         command.extend([
' -r', project.release_tag])
 
 3199     command.extend([
' -b', project.release_qual])
 
 3200     if project.local_release_dir != 
'':
 
 3201         command.extend([
' --localdir', project.local_release_dir])
 
 3202     if project.local_release_tar != 
'':
 
 3203         command.extend([
' --localtar', project.local_release_tar])
 
 3204     command.extend([
' --workdir', stage.workdir])
 
 3205     command.extend([
' --outdir', stage.outdir])
 
 3206     command.extend([
' --logdir', stage.logdir])
 
 3207     if stage.dirsize > 0:
 
 3208         command.extend([
' --dirsize', 
'%d' % stage.dirsize])
 
 3209     if stage.dirlevels > 0:
 
 3210         command.extend([
' --dirlevels', 
'%d' % stage.dirlevels])
 
 3213             command.extend([
' --exe', 
':'.
join(stage.exe)])
 
 3215             command.extend([
' --exe', stage.exe])
 
 3216     if stage.schema != 
'':
 
 3217         command.extend([
' --sam_schema', stage.schema])
 
 3218     if project.os != 
'':
 
 3219         command.extend([
' --os', project.os])
 
 3223     if not stage.pubs_input 
and stage.pubs_output 
and stage.output_subruns[0] > 0:
 
 3224         command.extend([
'--process', 
'%d' % (stage.output_subruns[0]-1)])
 
 3229         command.append(
'--single')
 
 3231     if stage.inputfile != 
'':
 
 3232         command.extend([
' -s', stage.inputfile])
 
 3233     elif input_list_name != 
'':
 
 3234         command.extend([
' -S', input_list_name])
 
 3235     elif inputdef != 
'':
 
 3236         command.extend([
' --sam_defname', inputdef,
 
 3237                         ' --sam_project', prjname])
 
 3239         command.extend([
' --recur'])
 
 3240     if stage.mixinputdef != 
'':
 
 3241         command.extend([
' --mix_defname', stage.mixinputdef,
 
 3242                         ' --mix_project', mixprjname])
 
 3243     if stage.inputmode != 
'':
 
 3244         command.extend([
' --inputmode', stage.inputmode])
 
 3245     command.extend([
' -n', 
'%d' % stage.num_events])
 
 3246     if stage.inputdef == 
'':
 
 3247         command.extend([
' --njobs', 
'%d' % stage.num_jobs ])
 
 3248     for ftype 
in stage.datafiletypes:
 
 3249         command.extend([
'--data_file_type', ftype])
 
 3251         command.extend([
' --procmap', procmap])
 
 3254             command.extend([
' --output', 
':'.
join(stage.output)])
 
 3256             command.extend([
' --output', stage.output])
 
 3257     if stage.TFileName != 
'':
 
 3258         command.extend([
' --TFileName', stage.TFileName])
 
 3259     if stage.init_script != 
'':
 
 3260         command.extend([
' --init-script', os.path.basename(stage.init_script)])
 
 3261     if stage.init_source != 
'':
 
 3262         command.extend([
' --init-source', os.path.basename(stage.init_source)])
 
 3263     if stage.end_script != 
'':
 
 3264         command.extend([
' --end-script', os.path.basename(stage.end_script)])
 
 3265     if stage.mid_source != 
'':
 
 3266         command.extend([
' --mid-source', os.path.basename(stage.mid_source)])
 
 3267     if stage.mid_script != 
'':
 
 3268         command.extend([
' --mid-script', os.path.basename(stage.mid_script)])
 
 3269     if abssetupscript != 
'':
 
 3270         command.extend([
' --init', abssetupscript])
 
 3274     if stage.validate_on_worker == 1:
 
 3275       print(
'Validation will be done on the worker node %d' % stage.validate_on_worker)
 
 3276       command.extend([
' --validate'])
 
 3277       command.extend([
' --declare'])
 
 3279       if type(stage.fclname) == 
type([]) 
and len(stage.fclname) > 1:
 
 3280         command.extend([
' --maintain_parentage'])
 
 3282     if stage.copy_to_fts == 1:
 
 3283       command.extend([
' --copy'])
 
 3287     if (prjname != 
'' or mixprjname != 
'') 
and command_njobs == 1 
and not project.force_dag 
and not prj_started:
 
 3288         command.extend([
' --sam_start',
 
 3289                         ' --sam_station', project_utilities.get_experiment(),
 
 3290                         ' --sam_group', project_utilities.get_experiment()])
 
 3299     if command_njobs > 1 
or project.force_dag:
 
 3301             dag_prjs.append([inputdef, prjname])
 
 3302         if stage.mixinputdef != 
'':
 
 3303             dag_prjs.append([stage.mixinputdef, mixprjname])
 
 3305     for dag_prj 
in dag_prjs:
 
 3310         if workstartname == 
'' or workstopname == 
'':
 
 3311             raise RuntimeError(
'Sam start or stop project script not found.')
 
 3315         start_command = [
'jobsub']
 
 3319         start_command.append(
'--group=%s' % project_utilities.get_experiment())
 
 3320         if setupscript != 
'':
 
 3321             start_command.append(
'-f %s' % setupscript)
 
 3323         if stage.resource != 
'':
 
 3324             start_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
 
 3325         elif project.resource != 
'':
 
 3326             start_command.append(
'--resource-provides=usage_model=%s' % project.resource)
 
 3327         if stage.lines != 
'':
 
 3328             start_command.append(
'--lines=%s' % stage.lines)
 
 3329         elif project.lines != 
'':
 
 3330             start_command.append(
'--lines=%s' % project.lines)
 
 3331         if stage.site != 
'':
 
 3332             start_command.append(
'--site=%s' % stage.site)
 
 3333         if stage.blacklist != 
'':
 
 3334             start_command.append(
'--blacklist=%s' % stage.blacklist)
 
 3335         if project.os != 
'':
 
 3336             if stage.singularity == 0:
 
 3337                 start_command.append(
'--OS=%s' % project.os)
 
 3339                 p = project_utilities.get_singularity(project.os)
 
 3341                     start_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
 
 3343                     raise RuntimeError(
'No singularity image found for %s' % project.os)
 
 3344         if stage.jobsub_start != 
'':
 
 3345             for word 
in stage.jobsub_start.split():
 
 3346                 start_command.append(word)
 
 3347         opt = project_utilities.default_jobsub_submit_options()
 
 3349             for word 
in opt.split():
 
 3350                 start_command.append(word)
 
 3351         if stage.cvmfs != 0:
 
 3352             start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
 
 3353         if stage.stash != 0:
 
 3354             start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
 
 3355         if stage.singularity != 0:
 
 3356             start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
 
 3360         workstarturl = 
"file://%s" % workstartscript
 
 3361         start_command.append(workstarturl)
 
 3365         start_command.extend([
' --sam_station', project_utilities.get_experiment(),
 
 3366                               ' --sam_group', project_utilities.get_experiment(),
 
 3367                               ' --sam_defname', dag_prj[0],
 
 3368                               ' --sam_project', dag_prj[1],
 
 3371             start_command.extend([
' --recur'])
 
 3373         if abssetupscript != 
'':
 
 3374             start_command.extend([
' --init', abssetupscript])
 
 3376         if stage.num_jobs > 0 
and stage.max_files_per_job > 0:
 
 3377             start_command.extend([
' --max_files', 
'%d' % (stage.num_jobs * stage.max_files_per_job)])
 
 3379         if stage.prestagefraction > 0.:
 
 3380             start_command.extend([
' --prestage_fraction', 
'%f' % stage.prestagefraction])
 
 3384         start_command.extend([
' --logdir', stage.logdir])
 
 3388         if not prj_started 
or stage.prestagefraction > 0.:
 
 3389             start_commands.append(start_command)
 
 3393         stop_command = [
'jobsub']
 
 3397         stop_command.append(
'--group=%s' % project_utilities.get_experiment())
 
 3398         if setupscript != 
'':
 
 3399             stop_command.append(
'-f %s' % setupscript)
 
 3401         if stage.resource != 
'':
 
 3402             stop_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
 
 3403         elif project.resource != 
'':
 
 3404             stop_command.append(
'--resource-provides=usage_model=%s' % project.resource)
 
 3405         if stage.lines != 
'':
 
 3406             stop_command.append(
'--lines=%s' % stage.lines)
 
 3407         elif project.lines != 
'':
 
 3408             stop_command.append(
'--lines=%s' % project.lines)
 
 3409         if stage.site != 
'':
 
 3410             stop_command.append(
'--site=%s' % stage.site)
 
 3411         if stage.blacklist != 
'':
 
 3412             stop_command.append(
'--blacklist=%s' % stage.blacklist)
 
 3413         if project.os != 
'':
 
 3414             if stage.singularity == 0:
 
 3415                 stop_command.append(
'--OS=%s' % project.os)
 
 3417                 p = project_utilities.get_singularity(project.os)
 
 3419                     stop_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
 
 3421                     raise RuntimeError(
'No singularity image found for %s' % project.os)
 
 3422         if stage.jobsub_start != 
'':
 
 3423             for word 
in stage.jobsub_start.split():
 
 3424                 stop_command.append(word)
 
 3425         opt = project_utilities.default_jobsub_submit_options()
 
 3427             for word 
in opt.split():
 
 3428                 stop_command.append(word)
 
 3429         if stage.cvmfs != 0:
 
 3430             stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
 
 3431         if stage.stash != 0:
 
 3432             stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
 
 3433         if stage.singularity != 0:
 
 3434             stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
 
 3438         workstopurl = 
"file://%s" % workstopscript
 
 3439         stop_command.append(workstopurl)
 
 3443         stop_command.extend([
' --sam_station', project_utilities.get_experiment(),
 
 3444                              ' --sam_project', dag_prj[1],
 
 3449         stop_command.extend([
' --logdir', stage.logdir])
 
 3451         if abssetupscript != 
'':
 
 3452             stop_command.extend([
' --init', abssetupscript])
 
 3456         stop_commands.append(stop_command)
 
 3458     if len(start_commands) > 0 
or len(stop_commands) > 0:
 
 3462         dagfilepath = os.path.join(tmpdir, 
'submit.dag')
 
 3464         dag.write(
'<serial>\n')
 
 3468         if len(start_commands) > 0:
 
 3469             dag.write(
'\n<parallel>\n\n')
 
 3470             for start_command 
in start_commands:
 
 3472                 for word 
in start_command:
 
 3476                     if word[:6] == 
'jobsub':
 
 3480             dag.write(
'</parallel>\n')
 
 3484         dag.write(
'\n<parallel>\n\n')
 
 3485         for process 
in range(command_njobs):
 
 3489             for word 
in command:
 
 3499                         if word[:6] == 
'jobsub':
 
 3501                         if word[:7] == 
'--role=':
 
 3503                         if word.startswith(
'--jobsub-server='):
 
 3505                         word = project_utilities.dollar_escape(word)
 
 3507                         if word[:6] == 
'jobsub':
 
 3510             dag.write(
' --process %d\n' % process)
 
 3512         dag.write(
'\n</parallel>\n')
 
 3516         if len(stop_commands) > 0:
 
 3517             dag.write(
'\n<parallel>\n\n')
 
 3518             for stop_command 
in stop_commands:
 
 3520                 for word 
in stop_command:
 
 3524                     if word[:6] == 
'jobsub':
 
 3528             dag.write(
'</parallel>\n')
 
 3532         dag.write(
'\n</serial>\n')
 
 3537         command = [
'jobsub_submit_dag']
 
 3538         command.append(
'--group=%s' % project_utilities.get_experiment())
 
 3539         if project.server != 
'-' and project.server != 
'':
 
 3540             command.append(
'--jobsub-server=%s' % project.server)
 
 3541         command.append(
'--role=%s' % role)
 
 3542         dagfileurl = 
'file://'+ dagfilepath
 
 3543         command.append(dagfileurl)
 
 3545     checked_file = os.path.join(stage.bookdir, 
'checked')
 
 3549     submit_timeout = 3600000
 
 3551         submit_timeout += 1.0 * command_njobs
 
 3552     if stage.jobsub_timeout > submit_timeout:
 
 3553         submit_timeout = stage.jobsub_timeout
 
 3561         print(
'Invoke jobsub_submit')
 
 3566             jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
 3567             thread = threading.Thread(target=project_utilities.wait_for_subprocess, args=[jobinfo, q])
 
 3569             thread.join(timeout=submit_timeout)
 
 3570             if thread.is_alive():
 
 3576             if larbatch_posix.exists(checked_file):
 
 3577                 larbatch_posix.remove(checked_file)
 
 3578             if larbatch_posix.isdir(tmpdir):
 
 3579                 larbatch_posix.rmtree(tmpdir)
 
 3580             if larbatch_posix.isdir(tmpworkdir):
 
 3581                 larbatch_posix.rmtree(tmpworkdir)
 
 3583                 raise JobsubError(command, rc, jobout, joberr)
 
 3584             for line 
in jobout.split(
'\n'):
 
 3585                 if "JobsubJobId" in line:
 
 3586                     jobid = line.strip().split()[-1]
 
 3588                 raise JobsubError(command, rc, jobout, joberr)
 
 3589         print(
'jobsub_submit finished.')
 
 3595         if makeup_count > 0:
 
 3600                 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
 3601                 thread = threading.Thread(target=project_utilities.wait_for_subprocess,
 
 3604                 thread.join(timeout=submit_timeout)
 
 3605                 if thread.is_alive():
 
 3611                 if larbatch_posix.exists(checked_file):
 
 3612                     larbatch_posix.remove(checked_file)
 
 3613                 if larbatch_posix.isdir(tmpdir):
 
 3614                     larbatch_posix.rmtree(tmpdir)
 
 3615                 if larbatch_posix.isdir(tmpworkdir):
 
 3616                     larbatch_posix.rmtree(tmpworkdir)
 
 3618                     raise JobsubError(command, rc, jobout, joberr)
 
 3619                 for line 
in jobout.split(
'\n'):
 
 3620                     if "JobsubJobId" in line:
 
 3621                         jobid = line.strip().split()[-1]
 
 3623                     raise JobsubError(command, rc, jobout, joberr)
 
 3625             print(
'Makeup action aborted because makeup job count is zero.')
 
 3634 def dosubmit(project, stage, makeup=False, recur=False, dryrun=False):
 
 3638     project_utilities.test_kca()
 
 3642     larbatch_utilities.test_jobsub()
 
 3646     ok = stage.checksubmit()
 
 3648         print(
'No jobs submitted.')
 
 3655     if stage.pubs_output 
and not stage.dynamic:
 
 3656         if larbatch_posix.exists(stage.workdir):
 
 3657             larbatch_posix.rmtree(stage.workdir)
 
 3658         if larbatch_posix.exists(stage.outdir):
 
 3659             larbatch_posix.rmtree(stage.outdir)
 
 3660         if larbatch_posix.exists(stage.logdir):
 
 3661             larbatch_posix.rmtree(stage.logdir)
 
 3662         if larbatch_posix.exists(stage.bookdir):
 
 3663             larbatch_posix.rmtree(stage.bookdir)
 
 3674     ok = stage.checkinput(checkdef=
True)
 
 3676         print(
'No jobs submitted.')
 
 3681     if not makeup 
and not recur 
and not stage.dynamic:
 
 3682         if len(larbatch_posix.listdir(stage.outdir)) != 0:
 
 3683             raise RuntimeError(
'Output directory %s is not empty.' % stage.outdir)
 
 3684         if len(larbatch_posix.listdir(stage.logdir)) != 0:
 
 3685             raise RuntimeError(
'Log directory %s is not empty.' % stage.logdir)
 
 3686         if len(larbatch_posix.listdir(stage.bookdir)) != 0:
 
 3687             raise RuntimeError(
'Log directory %s is not empty.' % stage.bookdir)
 
 3691     jobid = 
dojobsub(project, stage, makeup, recur, dryrun)
 
 3695     jobids_filename = os.path.join(stage.bookdir, 
'jobids.list')
 
 3697     if larbatch_posix.exists(jobids_filename):
 
 3698         lines = larbatch_posix.readlines(jobids_filename)
 
 3704         jobids.append(jobid)
 
 3706     jobid_file = 
safeopen(jobids_filename)
 
 3707     for jobid 
in jobids:
 
 3708         jobid_file.write(
'%s\n' % jobid)
 
 3724     hnlist = os.path.join(stage.bookdir, 
'filesana.list')
 
 3725     if larbatch_posix.exists(hnlist):
 
 3726         hlist = larbatch_posix.readlines(hnlist)
 
 3728         raise RuntimeError(
'No filesana.list file found %s, run project.py --checkana' % hnlist)
 
 3730     histurlsname_temp = 
'histurls.list' 
 3731     histurls = 
safeopen(histurlsname_temp)
 
 3734         histurls.write(
'%s\n' % hist)
 
 3738         name = os.path.join(stage.outdir, 
'anahist.root')
 
 3739         if name[0:6] == 
'/pnfs/':
 
 3740             tempdir = 
'%s/mergentuple_%d_%d' % (project_utilities.get_scratch_dir(),
 
 3743             if not larbatch_posix.isdir(tempdir):
 
 3744                 larbatch_posix.makedirs(tempdir)
 
 3745             name_temp = 
'%s/anahist.root' % tempdir
 
 3750             mergecom = 
"hadd -T" 
 3754             mergecom = stage.merge
 
 3756         print(
"Merging %d root files using %s." % (len(hlist), mergecom))
 
 3758         if larbatch_posix.exists(name_temp):
 
 3759             larbatch_posix.remove(name_temp)
 
 3760         comlist = mergecom.split()
 
 3761         comlist.extend([
"-f", 
"-k", name_temp, 
'@' + histurlsname_temp])
 
 3762         rc = subprocess.call(comlist, stdout=sys.stdout, stderr=sys.stderr)
 
 3764             print(
"%s exit status %d" % (mergecom, rc))
 
 3765         if name != name_temp:
 
 3766             if larbatch_posix.exists(name):
 
 3767                 larbatch_posix.remove(name)
 
 3768             if larbatch_posix.exists(name_temp):
 
 3771                 larbatch_posix.copy(name_temp, name)
 
 3772                 larbatch_posix.rmtree(tempdir)
 
 3773         larbatch_posix.remove(histurlsname_temp)
 
 3781     stage_has_input = stage.inputfile != 
'' or stage.inputlist != 
'' or stage.inputdef != 
'' 
 3782     if not stage_has_input:
 
 3783         raise RuntimeError(
'No auditing for generator stage.')
 
 3789     if stage.defname != 
'':
 
 3790         query = 
'isparentof: (defname: %s) and availability: anylocation' %(stage.defname)
 
 3792             outparentlist = samweb.listFiles(dimensions=query)
 
 3793             outputlist = samweb.listFiles(defname=stage.defname)
 
 3795             raise RuntimeError(
'Error accessing sam information for definition %s.\nDoes definition exist?' % stage.defname)
 
 3797         raise RuntimeError(
'Output definition not found.')
 
 3803     if stage.inputdef != 
'':
 
 3805         inputlist=samweb.listFiles(defname=stage.inputdef)
 
 3806     elif stage.inputlist != 
'':
 
 3808         if larbatch_posix.exists(stage.inputlist):
 
 3809             ilist = larbatch_posix.readlines(stage.inputlist)
 
 3812                 inputlist.append(os.path.basename(i.strip()))
 
 3814         raise RuntimeError(
'Input definition and/or input list does not exist.')
 
 3816     difflist = set(inputlist)^set(outparentlist)
 
 3819     for item 
in difflist:
 
 3820         if item 
in inputlist:
 
 3823                 missingfilelistname = os.path.join(stage.bookdir, 
'missingfiles.list')
 
 3824                 missingfilelist = 
safeopen(missingfilelistname)
 
 3826                     missingfilelist.write(
"%s\n" %item)
 
 3827         elif item 
in outparentlist:
 
 3829             childcmd = 
'samweb list-files "ischildof: (file_name=%s) and availability: physical"' %(item)
 
 3830             children = 
convert_str(subprocess.check_output(childcmd, shell=
True)).splitlines()
 
 3831             rmfile = 
list(set(children) & set(outputlist))[0]
 
 3834                 fnlist = os.path.join(stage.bookdir, 
'files.list')
 
 3835                 if larbatch_posix.exists(fnlist):
 
 3836                     flist = larbatch_posix.readlines(fnlist)
 
 3839                         slist.append(line.split()[0])
 
 3841                     raise RuntimeError(
'No files.list file found %s, run project.py --check' % fnlist)
 
 3845             sdict = {
'content_status':
'bad'}
 
 3846             project_utilities.test_kca()
 
 3847             samweb.modifyFileMetadata(rmfile, sdict)
 
 3848             print(
'\nDeclaring the status of the following file as bad:', rmfile)
 
 3853             fn = [x 
for x 
in slist 
if os.path.basename(x.strip()) != rmfile]
 
 3856                 thefile.write(
"%s\n" % item)
 
 3859         print(
"Everything in order.")
 
 3862         print(
'Missing parent file(s) = ', mc)
 
 3863         print(
'Extra parent file(s) = ',me)
 
 3866         missingfilelist.close()
 
 3867         print(
"Creating missingfiles.list in the output directory....done!")
 
 3871         print(
"For extra parent files, files.list redefined and content status declared as bad in SAM...done!")
 
 3878     filename = sys.argv[0]
 
 3879     file = 
open(filename, 
'r') 
 3883     for line 
in file.readlines():
 
 3884         if line[2:12] == 
'project.py':
 
 3886         elif line[0:6] == 
'######' and doprint:
 
 3890                 print(line[2:], end=
' ')
 
 3910     normxmlfile = xmlfile
 
 3914     if xmlfile.find(
':') < 0 
and \
 
 3915        not xmlfile.startswith(
'/') 
and \
 
 3916        not xmlfile.startswith(
'./') 
and \
 
 3917        not xmlfile.startswith(
'../') 
and \
 
 3923         dirs = [os.getcwd()]
 
 3927         if 'XMLPATH' in os.environ:
 
 3928             dirs.extend(os.environ[
'XMLPATH'].split(
':'))
 
 3933             xmlpath = os.path.join(dir, xmlfile)
 
 3934             if os.path.exists(xmlpath):
 
 3935                 normxmlfile = xmlpath
 
 3946     filename = sys.argv[0]
 
 3947     file = 
open(filename, 
'r') 
 3951     for line 
in file.readlines():
 
 3952         if line[2:20] == 
'XML file structure':
 
 3954         elif line[0:6] == 
'######' and doprint:
 
 3958                 print(line[2:], end=
' ')
 
 4014     check_declarations = 0
 
 4015     check_declarations_ana = 0
 
 4016     test_declarations = 0
 
 4017     test_declarations_ana = 0
 
 4018     check_definition = 0
 
 4019     check_definition_ana = 0
 
 4021     test_definition_ana = 0
 
 4023     add_locations_ana = 0
 
 4025     check_locations_ana = 0
 
 4031     clean_locations_ana = 0
 
 4032     remove_locations = 0
 
 4033     remove_locations_ana = 0
 
 4036     while len(args) > 0:
 
 4037         if args[0] == 
'-h' or args[0] == 
'--help' :
 
 4040         elif args[0] == 
'-xh' or args[0] == 
'--xmlhelp' :
 
 4043         elif args[0] == 
'--xml' and len(args) > 1:
 
 4046         elif args[0] == 
'--project' and len(args) > 1:
 
 4047             projectname = args[1]
 
 4049         elif args[0] == 
'--stage' and len(args) > 1:
 
 4050             stagenames = args[1].split(
',')
 
 4052         elif args[0] == 
'--tmpdir' and len(args) > 1:
 
 4053             os.environ[
'TMPDIR'] = args[1]
 
 4055         elif args[0] == 
'--lines' and len(args) > 1:
 
 4058         elif args[0] == 
'--site' and len(args) > 1:
 
 4061         elif args[0] == 
'--cpu' and len(args) > 1:
 
 4064         elif args[0] == 
'--disk' and len(args) > 1:
 
 4067         elif args[0] == 
'--memory' and len(args) > 1:
 
 4068             memory = int(args[1])
 
 4070         elif args[0] == 
'--inputdef' and len(args) > 1:
 
 4073         elif args[0] == 
'--submit':
 
 4076         elif args[0] == 
'--recur':
 
 4079         elif args[0] == 
'--pubs' and len(args) > 2:
 
 4081             pubs_run = int(args[1])
 
 4082             pubs_subruns = project_utilities.parseInt(args[2])
 
 4084             if len(args) > 0 
and args[0] != 
'' and args[0][0] != 
'-':
 
 4085                 pubs_version = int(args[0])
 
 4087         elif args[0] == 
'--check':
 
 4090         elif args[0] == 
'--checkana':
 
 4093         elif args[0] == 
'--shorten':
 
 4096         elif args[0] == 
'--fetchlog':
 
 4099         elif args[0] == 
'--merge':
 
 4102         elif args[0] == 
'--mergehist':
 
 4105         elif args[0] == 
'--mergentuple':
 
 4108         elif args[0] == 
'--audit':
 
 4111         elif args[0] == 
'--status':
 
 4114         elif args[0] == 
'--makeup':
 
 4117         elif args[0] == 
'--clean':
 
 4120         elif args[0] == 
'--clean_one':
 
 4123         elif args[0] == 
'--dump_project':
 
 4126         elif args[0] == 
'--dump_stage':
 
 4129         elif args[0] == 
'--dryrun':
 
 4132         elif args[0] == 
'--nocheck':
 
 4135         elif args[0] == 
'--outdir':
 
 4138         elif args[0] == 
'--logdir':
 
 4141         elif args[0] == 
'--workdir':
 
 4144         elif args[0] == 
'--bookdir':
 
 4147         elif args[0] == 
'--fcl':
 
 4150         elif args[0] == 
'--defname':
 
 4153         elif args[0] == 
'--input_files':
 
 4156         elif args[0] == 
'--check_submit':
 
 4159         elif args[0] == 
'--check_input':
 
 4162         elif args[0] == 
'--declare':
 
 4165         elif args[0] == 
'--declare_ana':
 
 4168         elif args[0] == 
'--define':
 
 4171         elif args[0] == 
'--define_ana':
 
 4174         elif args[0] == 
'--undefine':
 
 4177         elif args[0] == 
'--check_declarations':
 
 4178             check_declarations = 1
 
 4180         elif args[0] == 
'--check_declarations_ana':
 
 4181             check_declarations_ana = 1
 
 4183         elif args[0] == 
'--test_declarations':
 
 4184             test_declarations = 1
 
 4186         elif args[0] == 
'--test_declarations_ana':
 
 4187             test_declarations_ana = 1
 
 4189         elif args[0] == 
'--check_definition':
 
 4190             check_definition = 1
 
 4192         elif args[0] == 
'--check_definition_ana':
 
 4193             check_definition_ana = 1
 
 4195         elif args[0] == 
'--test_definition':
 
 4198         elif args[0] == 
'--test_definition_ana':
 
 4199             test_definition_ana = 1
 
 4201         elif args[0] == 
'--add_locations':
 
 4204         elif args[0] == 
'--add_locations_ana':
 
 4205             add_locations_ana = 1
 
 4207         elif args[0] == 
'--check_locations':
 
 4210         elif args[0] == 
'--check_locations_ana':
 
 4211             check_locations_ana = 1
 
 4213         elif args[0] == 
'--upload':
 
 4216         elif args[0] == 
'--upload_ana':
 
 4219         elif args[0] == 
'--check_tape':
 
 4222         elif args[0] == 
'--check_tape_ana':
 
 4225         elif args[0] == 
'--clean_locations':
 
 4228         elif args[0] == 
'--clean_locations_ana':
 
 4229             clean_locations_ana = 1
 
 4231         elif args[0] == 
'--remove_locations':
 
 4232             remove_locations = 1
 
 4234         elif args[0] == 
'--remove_locations_ana':
 
 4235             remove_locations_ana = 1
 
 4238             print(
'Unknown option %s' % args[0])
 
 4248         print(
'No xml file specified.  Type "project.py -h" for help.')
 
 4254     num_action = submit + check + checkana + fetchlog + merge + mergehist + mergentuple + audit + stage_status + makeup + define + define_ana + undefine + declare + declare_ana
 
 4256         print(
'More than one action was specified.')
 
 4265     for stagename 
in stagenames:
 
 4268             if projectname == 
'':
 
 4269                 projectname = project.name
 
 4271             raise RuntimeError(
'No project selected.\n')
 
 4276         for stagename 
in stagenames:
 
 4277             docleanx(projects, projectname, stagename, clean_descendants = 
True)
 
 4282         for stagename 
in stagenames:
 
 4283             docleanx(projects, projectname, stagename, clean_descendants = 
False)
 
 4295     for stagename 
in stagenames:
 
 4296         stage = project.get_stage(stagename)
 
 4297         stages[stagename] = stage
 
 4310             stage.memory = memory
 
 4312             stage.inputdef = inputdef
 
 4313             stage.inputfile = 
'' 
 4314             stage.inputlist = 
'' 
 4321             stage.pubsify_input(pubs_run, pubs_subruns, pubs_version)
 
 4322             stage.pubsify_output(pubs_run, pubs_subruns, pubs_version)
 
 4326         if stage.recur 
and stage.inputdef != 
'' and stage.basedef != 
'':
 
 4333                 desc = samweb.descDefinition(defname=stage.inputdef)
 
 4335             except samweb_cli.exceptions.DefinitionNotFound:
 
 4342                 project_utilities.test_kca()
 
 4350                 project_wildcard = 
'%s_%%' % samweb.makeProjectName(stage.inputdef).rsplit(
'_',1)[0]
 
 4351                 if stage.recurtype == 
'snapshot':
 
 4352                     dim = 
'defname: %s minus snapshot_for_project_name %s' % \
 
 4353                         (stage.basedef, project_wildcard)
 
 4354                 elif stage.recurtype == 
'consumed':
 
 4355                     dim = 
'defname: %s minus (project_name %s and consumed_status consumed)' % \
 
 4356                         (stage.basedef, project_wildcard)
 
 4358                 elif stage.recurtype == 
'child':
 
 4364                     if stage.data_stream != 
None and len(stage.data_stream) > 0:
 
 4365                         nstream = len(stage.data_stream)
 
 4368                     for istream 
in range(nstream):
 
 4369                         idim = project_utilities.dimensions_datastream(project, stage, 
 
 4370                                                                        ana=
False, index=istream)
 
 4371                         if idim.find(
'anylocation') > 0:
 
 4372                             idim = idim.replace(
'anylocation', 
'physical')
 
 4374                             idim += 
' with availability physical' 
 4378                         dim += 
'(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
 
 4380                     if stage.activebase != 
'':
 
 4381                         activedef = 
'%s_active' % stage.activebase
 
 4382                         waitdef = 
'%s_wait' % stage.activebase
 
 4383                         dim += 
' minus defname: %s' % activedef
 
 4384                         dim += 
' minus defname: %s' % waitdef
 
 4385                         project_utilities.makeDummyDef(activedef)
 
 4386                         project_utilities.makeDummyDef(waitdef)
 
 4388                 elif stage.recurtype == 
'anachild':
 
 4394                     if stage.ana_data_stream != 
None and len(stage.ana_data_stream) > 0:
 
 4395                         nstream = len(stage.ana_data_stream)
 
 4398                     for istream 
in range(nstream):
 
 4399                         idim = project_utilities.dimensions_datastream(project, stage, 
 
 4400                                                                        ana=
True, index=istream)
 
 4401                         if idim.find(
'anylocation') > 0:
 
 4402                             idim = idim.replace(
'anylocation', 
'physical')
 
 4404                             idim += 
' with availability physical' 
 4408                         dim += 
'(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
 
 4410                     if stage.activebase != 
'':
 
 4411                         activedef = 
'%s_active' % stage.activebase
 
 4412                         waitdef = 
'%s_wait' % stage.activebase
 
 4413                         dim += 
' minus defname: %s' % activedef
 
 4414                         dim += 
' minus defname: %s' % waitdef
 
 4415                         project_utilities.makeDummyDef(activedef)
 
 4416                         project_utilities.makeDummyDef(waitdef)
 
 4418                 elif stage.recurtype != 
'' and stage.recurtype != 
'none':
 
 4419                     raise RuntimeError(
'Unknown recursive type %s.' % stage.recurtype)
 
 4423                 if stage.recurlimit != 0:
 
 4424                     dim += 
' with limit %d' % stage.recurlimit
 
 4428                 print(
'Creating recursive dataset definition %s' % stage.inputdef)
 
 4429                 project_utilities.test_kca()
 
 4430                 samweb.createDefinition(defname=stage.inputdef, dims=dim)
 
 4436         for stagename 
in stagenames:
 
 4437             print(
'Stage %s:' % stagename)
 
 4438             stage = stages[stagename]
 
 4449         for stagename 
in stagenames:
 
 4450             print(
'Stage %s:' % stagename)
 
 4451             stage = stages[stagename]
 
 4457         for stagename 
in stagenames:
 
 4458             print(
'Stage %s:' % stagename)
 
 4459             stage = stages[stagename]
 
 4465         for stagename 
in stagenames:
 
 4466             print(
'Stage %s:' % stagename)
 
 4467             stage = stages[stagename]
 
 4468             print(stage.workdir)
 
 4473         for stagename 
in stagenames:
 
 4474             print(
'Stage %s:' % stagename)
 
 4475             stage = stages[stagename]
 
 4476             print(stage.bookdir)
 
 4481         for stagename 
in stagenames:
 
 4482             print(
'Stage %s:' % stagename)
 
 4483             stage = stages[stagename]
 
 4484             if stage.defname != 
'':
 
 4485                 print(stage.defname)
 
 4490         for stagename 
in stagenames:
 
 4491             print(
'Stage %s:' % stagename)
 
 4492             stage = stages[stagename]
 
 4494             for input_file 
in input_files:
 
 4500         for stagename 
in stagenames:
 
 4501             print(
'Stage %s:' % stagename)
 
 4502             stage = stages[stagename]
 
 4508         for stagename 
in stagenames:
 
 4509             print(
'Stage %s:' % stagename)
 
 4510             stage = stages[stagename]
 
 4511             stage.checkinput(checkdef=
True)
 
 4516         for stagename 
in stagenames:
 
 4517             print(
'Stage %s:' % stagename)
 
 4518             stage = stages[stagename]
 
 4525     if submit 
or makeup:
 
 4529         for stagename 
in stagenames:
 
 4530             print(
'Stage %s:' % stagename)
 
 4532             if project_utilities.check_running(xmlfile, stagename):
 
 4533                 print(
'Skipping job submission because similar job submission process is running.')
 
 4535                 stage = stages[stagename]
 
 4536                 dosubmit(project, stage, makeup, stage.recur, dryrun)
 
 4538     if check 
or checkana:
 
 4542         for stagename 
in stagenames:
 
 4543             print(
'Stage %s:' % stagename)
 
 4544             stage = stages[stagename]
 
 4545             docheck(project, stage, checkana 
or stage.ana, stage.validate_on_worker)
 
 4551         for stagename 
in stagenames:
 
 4552             print(
'Stage %s:' % stagename)
 
 4553             stage = stages[stagename]
 
 4556     if mergehist 
or mergentuple 
or merge:
 
 4561         for stagename 
in stagenames:
 
 4562             print(
'Stage %s:' % stagename)
 
 4563             stage = stages[stagename]
 
 4564             domerge(stage, mergehist, mergentuple)
 
 4570         for stagename 
in stagenames:
 
 4571             print(
'Stage %s:' % stagename)
 
 4572             stage = stages[stagename]
 
 4575     if check_definition 
or define:
 
 4579         for stagename 
in stagenames:
 
 4580             print(
'Stage %s:' % stagename)
 
 4581             stage = stages[stagename]
 
 4583                 if stage.ana_defname == 
'':
 
 4584                     print(
'No sam analysis dataset definition name specified for this stage.')
 
 4586                 dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
 
 4589                 if stage.defname == 
'':
 
 4590                     print(
'No sam dataset definition name specified for this stage.')
 
 4592                 dim = project_utilities.dimensions_datastream(project, stage, ana=
False)
 
 4595     if check_definition_ana 
or define_ana:
 
 4599         for stagename 
in stagenames:
 
 4600             print(
'Stage %s:' % stagename)
 
 4601             stage = stages[stagename]
 
 4602             if stage.ana_defname == 
'':
 
 4603                 print(
'No sam analysis dataset definition name specified for this stage.')
 
 4605             dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
 
 4612         for stagename 
in stagenames:
 
 4613             print(
'Stage %s:' % stagename)
 
 4614             stage = stages[stagename]
 
 4616                 if stage.ana_defname == 
'':
 
 4617                     print(
'No sam dataset definition name specified for this stage.')
 
 4621                 if stage.defname == 
'':
 
 4622                     print(
'No sam dataset definition name specified for this stage.')
 
 4626     if test_definition_ana:
 
 4630         for stagename 
in stagenames:
 
 4631             print(
'Stage %s:' % stagename)
 
 4632             stage = stages[stagename]
 
 4633             if stage.ana_defname == 
'':
 
 4634                 print(
'No sam dataset definition name specified for this stage.')
 
 4642         for stagename 
in stagenames:
 
 4643             print(
'Stage %s:' % stagename)
 
 4644             stage = stages[stagename]
 
 4645             if stage.defname == 
'':
 
 4646                 print(
'No sam dataset definition name specified for this stage.')
 
 4650     if check_declarations 
or declare:
 
 4654         for stagename 
in stagenames:
 
 4655             print(
'Stage %s:' % stagename)
 
 4656             stage = stages[stagename]
 
 4659     if check_declarations_ana 
or declare_ana:
 
 4663         for stagename 
in stagenames:
 
 4664             print(
'Stage %s:' % stagename)
 
 4665             stage = stages[stagename]
 
 4668     if test_declarations:
 
 4672         for stagename 
in stagenames:
 
 4673             print(
'Stage %s:' % stagename)
 
 4674             stage = stages[stagename]
 
 4675             dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
 
 4678     if test_declarations_ana:
 
 4682         for stagename 
in stagenames:
 
 4683             print(
'Stage %s:' % stagename)
 
 4684             stage = stages[stagename]
 
 4685             dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
 
 4688     if check_locations 
or add_locations 
or clean_locations 
or remove_locations 
or upload:
 
 4692         for stagename 
in stagenames:
 
 4693             print(
'Stage %s:' % stagename)
 
 4694             stage = stages[stagename]
 
 4695             dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
 
 4697                               add_locations, clean_locations, remove_locations,
 
 4700     if check_locations_ana 
or add_locations_ana 
or clean_locations_ana 
or \
 
 4701        remove_locations_ana 
or upload_ana:
 
 4705         for stagename 
in stagenames:
 
 4706             print(
'Stage %s:' % stagename)
 
 4707             stage = stages[stagename]
 
 4708             dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
 
 4710                               add_locations_ana, clean_locations_ana, remove_locations_ana,
 
 4717         for stagename 
in stagenames:
 
 4718             print(
'Stage %s:' % stagename)
 
 4719             stage = stages[stagename]
 
 4720             dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
 
 4727         for stagename 
in stagenames:
 
 4728             print(
'Stage %s:' % stagename)
 
 4729             stage = stages[stagename]
 
 4730             dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
 
 4740     if larbatch_posix.exists(destination):
 
 4741         larbatch_posix.remove(destination)
 
 4742     file = larbatch_posix.open(destination, 
'w')
 
 4753         fileList = project_utilities.saferead(fileName)
 
 4759     if len(fileList) > 0:
 
 4760         for line 
in fileList:
 
 4761             returnArray.append(line.strip())
 
 4770 if __name__ == 
'__main__':
 
 4771     sys.exit(
main(sys.argv))
 
 4774                 inp = open(stage.inputlist,"r") 
 4776                     columns = line.split(
"/")
 
 4777                     columns = [col.strip() 
for col 
in columns]
 
 4778                     inputlist.append(columns[8])
 
then if[["$THISISATEST"==1]]
do one_file $F done echo for F in find $TOP name CMakeLists txt print
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep. 
print OUTPUT<< EOF;< setup name="Default"version="1.0">< worldref="volWorld"/></setup ></gdml > EOF close(OUTPUT)
open(RACETRACK) or die("Could not open file $RACETRACK for writing")