44 default_site, default_blacklist, check=
True):
48 if base_stage !=
None:
49 self.
name = base_stage.name
76 self.
ana = base_stage.ana
190 self.
merge = default_merge
194 self.
site = default_site
196 self.
cpu = default_cpu
197 self.
disk = default_disk
199 self.
memory = default_memory
210 self.
cvmfs = default_cvmfs
211 self.
stash = default_stash
213 self.
script = default_script
221 if 'name' in dict(stage_element.attributes):
222 self.
name = str(stage_element.attributes[
'name'].firstChild.data)
224 raise XMLError(
"Stage name not specified.")
228 batchname_elements = stage_element.getElementsByTagName(
'batchname')
229 if batchname_elements:
230 self.
batchname = str(batchname_elements[0].firstChild.data)
234 fclname_elements = stage_element.getElementsByTagName(
'fcl')
235 if len(fclname_elements) > 0:
237 for fcl
in fclname_elements:
238 self.fclname.append(str(fcl.firstChild.data).
strip())
240 raise XMLError(
'No Fcl names specified for stage %s.' % self.
name)
244 outdir_elements = stage_element.getElementsByTagName(
'outdir')
246 self.
outdir = str(outdir_elements[0].firstChild.data)
248 raise XMLError(
'Output directory not specified for stage %s.' % self.
name)
252 logdir_elements = stage_element.getElementsByTagName(
'logdir')
254 self.
logdir = str(logdir_elements[0].firstChild.data)
260 workdir_elements = stage_element.getElementsByTagName(
'workdir')
262 self.
workdir = str(workdir_elements[0].firstChild.data)
264 raise XMLError(
'Work directory not specified for stage %s.' % self.
name)
268 bookdir_elements = stage_element.getElementsByTagName(
'bookdir')
270 self.
bookdir = str(bookdir_elements[0].firstChild.data)
276 dirsize_elements = stage_element.getElementsByTagName(
'dirsize')
278 self.
dirsize = int(dirsize_elements[0].firstChild.data)
282 dirlevels_elements = stage_element.getElementsByTagName(
'dirlevels')
283 if dirlevels_elements:
284 self.
dirlevels = int(dirlevels_elements[0].firstChild.data)
288 inputfile_elements = stage_element.getElementsByTagName(
'inputfile')
289 if inputfile_elements:
290 self.
inputfile = str(inputfile_elements[0].firstChild.data)
294 inputlist_elements = stage_element.getElementsByTagName(
'inputlist')
295 if inputlist_elements:
296 self.
inputlist = str(inputlist_elements[0].firstChild.data)
300 inputmode_elements = stage_element.getElementsByTagName(
'inputmode')
301 if inputmode_elements:
302 self.
inputmode = str(inputmode_elements[0].firstChild.data)
306 inputdef_elements = stage_element.getElementsByTagName(
'inputdef')
307 if inputdef_elements:
308 self.
inputdef = str(inputdef_elements[0].firstChild.data)
312 ana_elements = stage_element.getElementsByTagName(
'ana')
314 self.
ana = int(ana_elements[0].firstChild.data)
318 recur_elements = stage_element.getElementsByTagName(
'recur')
320 self.
recur = int(recur_elements[0].firstChild.data)
324 recurtype_elements = stage_element.getElementsByTagName(
'recurtype')
325 if recurtype_elements:
326 self.
recurtype = str(recurtype_elements[0].firstChild.data)
330 recurlimit_elements = stage_element.getElementsByTagName(
'recurlimit')
331 if recurlimit_elements:
332 self.
recurlimit = int(recurlimit_elements[0].firstChild.data)
336 recurdef_elements = stage_element.getElementsByTagName(
'recurdef')
337 if recurdef_elements:
339 self.
inputdef = str(recurdef_elements[0].firstChild.data)
344 singlerun_elements = stage_element.getElementsByTagName(
'singlerun')
345 if singlerun_elements:
346 self.
singlerun = int(singlerun_elements[0].firstChild.data)
350 filelistdef_elements = stage_element.getElementsByTagName(
'filelistdef')
351 if filelistdef_elements:
352 self.
filelistdef = int(filelistdef_elements[0].firstChild.data)
356 prestart_elements = stage_element.getElementsByTagName(
'prestart')
357 if prestart_elements:
358 self.
prestart = int(prestart_elements[0].firstChild.data)
362 activebase_elements = stage_element.getElementsByTagName(
'activebase')
363 if activebase_elements:
364 self.
activebase = str(activebase_elements[0].firstChild.data)
368 dropboxwait_elements = stage_element.getElementsByTagName(
'dropboxwait')
369 if dropboxwait_elements:
370 self.
dropboxwait = float(dropboxwait_elements[0].firstChild.data)
374 prestagefraction_elements = stage_element.getElementsByTagName(
'prestagefraction')
375 if prestagefraction_elements:
380 inputstream_elements = stage_element.getElementsByTagName(
'inputstream')
381 if inputstream_elements:
382 self.
inputstream = str(inputstream_elements[0].firstChild.data)
386 previousstage_elements = stage_element.getElementsByTagName(
'previousstage')
387 if previousstage_elements:
388 self.
previousstage = str(previousstage_elements[0].firstChild.data)
392 if base_stage !=
None:
400 raise XMLError(
'Previous stage and input specified for stage %s.' % self.
name)
404 mixinputdef_elements = stage_element.getElementsByTagName(
'mixinputdef')
405 if mixinputdef_elements:
406 self.
mixinputdef = str(mixinputdef_elements[0].firstChild.data)
411 raise XMLError(
'Input file and input list both specified for stage %s.' % self.
name)
417 raise XMLError(
'Input dataset and input files specified for stage %s.' % self.
name)
421 raise XMLError(
'Input list (inputlist) or inputfile is needed for textfile model.')
431 default_input_list =
''
432 previous_stage_name = default_previous_stage
435 if previous_stage_name
in default_input_lists:
436 default_input_list = default_input_lists[previous_stage_name]
440 if self.
inputstream ==
'' or default_input_list ==
'':
443 n = default_input_list.rfind(
'.')
445 n = len(default_input_list)
446 self.
inputlist =
'%s_%s%s' % (default_input_list[:n],
448 default_input_list[n:])
452 pubs_input_ok_elements = stage_element.getElementsByTagName(
'pubsinput')
453 if pubs_input_ok_elements:
454 self.
pubs_input_ok = int(pubs_input_ok_elements[0].firstChild.data)
458 maxfluxfilemb_elements = stage_element.getElementsByTagName(
'maxfluxfilemb')
459 if maxfluxfilemb_elements:
460 self.
maxfluxfilemb = int(maxfluxfilemb_elements[0].firstChild.data)
471 num_jobs_elements = stage_element.getElementsByTagName(
'numjobs')
472 if num_jobs_elements:
473 self.
num_jobs = int(num_jobs_elements[0].firstChild.data)
477 num_events_elements = stage_element.getElementsByTagName(
'numevents')
478 if num_events_elements:
479 self.
num_events = int(num_events_elements[0].firstChild.data)
483 max_files_per_job_elements = stage_element.getElementsByTagName(
'maxfilesperjob')
484 if max_files_per_job_elements:
490 run_number = stage_element.getElementsByTagName(
'runnumber')
492 self.
output_run = int(run_number[0].firstChild.data)
496 target_size_elements = stage_element.getElementsByTagName(
'targetsize')
497 if target_size_elements:
498 self.
target_size = int(target_size_elements[0].firstChild.data)
503 defname_elements = stage_element.getElementsByTagName(
'defname')
505 self.
defname = str(defname_elements[0].firstChild.data)
509 ana_defname_elements = stage_element.getElementsByTagName(
'anadefname')
510 if ana_defname_elements:
511 self.
ana_defname = str(ana_defname_elements[0].firstChild.data)
515 data_tier_elements = stage_element.getElementsByTagName(
'datatier')
516 if data_tier_elements:
517 self.
data_tier = str(data_tier_elements[0].firstChild.data)
521 data_stream_elements = stage_element.getElementsByTagName(
'datastream')
522 if len(data_stream_elements) > 0:
524 for data_stream
in data_stream_elements:
525 self.data_stream.append(str(data_stream.firstChild.data))
529 ana_data_tier_elements = stage_element.getElementsByTagName(
'anadatatier')
530 if ana_data_tier_elements:
531 self.
ana_data_tier = str(ana_data_tier_elements[0].firstChild.data)
535 ana_data_stream_elements = stage_element.getElementsByTagName(
'anadatastream')
536 if len(ana_data_stream_elements) > 0:
538 for ana_data_stream
in ana_data_stream_elements:
539 self.ana_data_stream.append(str(ana_data_stream.firstChild.data))
543 submit_script_elements = stage_element.getElementsByTagName(
'submitscript')
544 if submit_script_elements:
545 self.
submit_script = str(submit_script_elements[0].firstChild.data).split()
559 stdout=subprocess.PIPE,
560 stderr=subprocess.PIPE)
561 jobout, joberr = jobinfo.communicate()
569 raise IOError(
'Submit script %s not found.' % self.
submit_script[0])
573 init_script_elements = stage_element.getElementsByTagName(
'initscript')
574 if len(init_script_elements) > 0:
575 for init_script_element
in init_script_elements:
576 init_script = str(init_script_element.firstChild.data)
581 if init_script !=
'':
582 if larbatch_posix.exists(init_script):
583 init_script = os.path.realpath(init_script)
589 jobinfo = subprocess.Popen([
'which', init_script],
590 stdout=subprocess.PIPE,
591 stderr=subprocess.PIPE)
592 jobout, joberr = jobinfo.communicate()
598 if not larbatch_posix.exists(init_script):
599 raise IOError(
'Init script %s not found.' % init_script)
601 self.init_script.append(init_script)
605 init_source_elements = stage_element.getElementsByTagName(
'initsource')
606 if len(init_source_elements) > 0:
607 for init_source_element
in init_source_elements:
608 init_source = str(init_source_element.firstChild.data)
612 if init_source !=
'':
614 if larbatch_posix.exists(init_source):
615 init_source = os.path.realpath(init_source)
621 jobinfo = subprocess.Popen([
'which', init_source],
622 stdout=subprocess.PIPE,
623 stderr=subprocess.PIPE)
624 jobout, joberr = jobinfo.communicate()
630 if not larbatch_posix.exists(init_source):
631 raise IOError(
'Init source script %s not found.' % init_source)
637 parent_element = init_source_element.parentNode
638 if parent_element.nodeName ==
'fcl':
645 fcl = str(parent_element.firstChild.data).
strip()
646 n = self.fclname.index(fcl)
655 self.init_source.append(init_source)
659 end_script_elements = stage_element.getElementsByTagName(
'endscript')
660 if len(end_script_elements) > 0:
661 for end_script_element
in end_script_elements:
662 end_script = str(end_script_element.firstChild.data)
668 if larbatch_posix.exists(end_script):
669 end_script = os.path.realpath(end_script)
675 jobinfo = subprocess.Popen([
'which', end_script],
676 stdout=subprocess.PIPE,
677 stderr=subprocess.PIPE)
678 jobout, joberr = jobinfo.communicate()
684 if not larbatch_posix.exists(end_script):
685 raise IOError(
'End-of-job script %s not found.' % end_script)
691 parent_element = end_script_element.parentNode
692 if parent_element.nodeName ==
'fcl':
699 fcl = str(parent_element.firstChild.data).
strip()
700 n = self.fclname.index(fcl)
709 self.end_script.append(end_script)
713 project_name_elements = stage_element.getElementsByTagName(
'projectname')
714 if len(project_name_elements) > 0:
715 for project_name_element
in project_name_elements:
719 fcl_element = project_name_element.parentNode
720 if fcl_element.nodeName !=
'fcl':
721 raise XMLError(
"Found <projectname> element outside <fcl> element.")
722 fcl = str(fcl_element.firstChild.data).
strip()
727 n = self.fclname.index(fcl)
732 self.project_name.append(
'')
736 project_name = str(project_name_element.firstChild.data)
745 self.project_name.append(
'')
749 stage_name_elements = stage_element.getElementsByTagName(
'stagename')
750 if len(stage_name_elements) > 0:
751 for stage_name_element
in stage_name_elements:
755 fcl_element = stage_name_element.parentNode
756 if fcl_element.nodeName !=
'fcl':
757 raise XMLError(
"Found <stagename> element outside <fcl> element.")
758 fcl = str(fcl_element.firstChild.data).
strip()
763 n = self.fclname.index(fcl)
768 self.stage_name.append(
'')
772 stage_name = str(stage_name_element.firstChild.data)
781 self.stage_name.append(
'')
785 project_version_elements = stage_element.getElementsByTagName(
'version')
786 if len(project_version_elements) > 0:
787 for project_version_element
in project_version_elements:
791 fcl_element = project_version_element.parentNode
792 if fcl_element.nodeName !=
'fcl':
793 raise XMLError(
"Found stage level <version> element outside <fcl> element.")
794 fcl = str(fcl_element.firstChild.data).
strip()
799 n = self.fclname.index(fcl)
804 self.project_version.append(
'')
808 project_version = str(project_version_element.firstChild.data)
817 self.project_version.append(
'')
821 merge_elements = stage_element.getElementsByTagName(
'merge')
823 self.
merge = str(merge_elements[0].firstChild.data)
827 anamerge_elements = stage_element.getElementsByTagName(
'anamerge')
828 if anamerge_elements:
829 self.
anamerge = str(anamerge_elements[0].firstChild.data)
833 resource_elements = stage_element.getElementsByTagName(
'resource')
834 if resource_elements:
835 self.
resource = str(resource_elements[0].firstChild.data)
840 lines_elements = stage_element.getElementsByTagName(
'lines')
842 self.
lines = str(lines_elements[0].firstChild.data)
846 site_elements = stage_element.getElementsByTagName(
'site')
848 self.
site = str(site_elements[0].firstChild.data)
849 self.
site =
''.
join(self.site.split())
853 blacklist_elements = stage_element.getElementsByTagName(
'blacklist')
854 if blacklist_elements:
855 self.
blacklist = str(blacklist_elements[0].firstChild.data)
860 cpu_elements = stage_element.getElementsByTagName(
'cpu')
862 self.
cpu = int(cpu_elements[0].firstChild.data)
866 disk_elements = stage_element.getElementsByTagName(
'disk')
868 self.
disk = str(disk_elements[0].firstChild.data)
869 self.
disk =
''.
join(self.disk.split())
873 datafiletypes_elements = stage_element.getElementsByTagName(
'datafiletypes')
874 if datafiletypes_elements:
875 data_file_types_str = str(datafiletypes_elements[0].firstChild.data)
876 data_file_types_str =
''.
join(data_file_types_str.split())
881 memory_elements = stage_element.getElementsByTagName(
'memory')
883 self.
memory = int(memory_elements[0].firstChild.data)
887 param_elements = stage_element.getElementsByTagName(
'parameter')
888 if len(param_elements) > 0:
890 for param_element
in param_elements:
891 name = str(param_element.attributes[
'name'].firstChild.data)
892 value = str(param_element.firstChild.data)
897 output_elements = stage_element.getElementsByTagName(
'output')
898 if len(output_elements) > 0:
906 for output_element
in output_elements:
907 parent_element = output_element.parentNode
908 if parent_element.nodeName !=
'fcl':
909 output = str(output_element.firstChild.data)
912 self.output.append(output)
916 for output_element
in output_elements:
917 parent_element = output_element.parentNode
918 if parent_element.nodeName ==
'fcl':
922 fcl = str(parent_element.firstChild.data).
strip()
923 n = self.fclname.index(fcl)
927 while len(self.
output) < n+1:
928 self.output.append(
'')
932 output = str(output_element.firstChild.data)
941 self.output.append(
'')
945 TFileName_elements = stage_element.getElementsByTagName(
'TFileName')
946 if TFileName_elements:
947 self.
TFileName = str(TFileName_elements[0].firstChild.data)
951 jobsub_elements = stage_element.getElementsByTagName(
'jobsub')
953 self.
jobsub = str(jobsub_elements[0].firstChild.data)
957 jobsub_start_elements = stage_element.getElementsByTagName(
'jobsub_start')
958 if jobsub_start_elements:
959 self.
jobsub_start = str(jobsub_start_elements[0].firstChild.data)
963 jobsub_timeout_elements = stage_element.getElementsByTagName(
'jobsub_timeout')
964 if jobsub_timeout_elements:
965 self.
jobsub_timeout = int(jobsub_timeout_elements[0].firstChild.data)
969 exe_elements = stage_element.getElementsByTagName(
'exe')
970 if len(exe_elements) > 0:
978 for exe_element
in exe_elements:
979 parent_element = exe_element.parentNode
980 if parent_element.nodeName !=
'fcl':
981 exe = str(exe_element.firstChild.data)
988 for exe_element
in exe_elements:
989 parent_element = exe_element.parentNode
990 if parent_element.nodeName ==
'fcl':
994 fcl = str(parent_element.firstChild.data).
strip()
995 n = self.fclname.index(fcl)
999 while len(self.
exe) < n+1:
1004 exe = str(exe_element.firstChild.data)
1011 if len(self.
exe) > 0:
1017 schema_elements = stage_element.getElementsByTagName(
'schema')
1019 self.
schema = str(schema_elements[0].firstChild.data)
1023 validate_on_worker_elements = stage_element.getElementsByTagName(
'check')
1024 if validate_on_worker_elements:
1029 copy_to_fts_elements = stage_element.getElementsByTagName(
'copy')
1030 if copy_to_fts_elements:
1031 self.
copy_to_fts = int(copy_to_fts_elements[0].firstChild.data)
1035 cvmfs_elements = stage_element.getElementsByTagName(
'cvmfs')
1037 self.
cvmfs = int(cvmfs_elements[0].firstChild.data)
1041 stash_elements = stage_element.getElementsByTagName(
'stash')
1043 self.
stash = int(stash_elements[0].firstChild.data)
1047 singularity_elements = stage_element.getElementsByTagName(
'singularity')
1048 if singularity_elements:
1049 self.
singularity = int(singularity_elements[0].firstChild.data)
1053 script_elements = stage_element.getElementsByTagName(
'script')
1055 self.
script = script_elements[0].firstChild.data
1062 jobinfo = subprocess.Popen([
'which', self.
script],
1063 stdout=subprocess.PIPE,
1064 stderr=subprocess.PIPE)
1065 jobout, joberr = jobinfo.communicate()
1069 script_path = jobout.splitlines()[0].
strip()
1072 if script_path ==
'' or not larbatch_posix.access(script_path, os.X_OK):
1073 raise IOError(
'Script %s not found.' % self.
script)
1074 self.
script = script_path
1078 start_script_elements = stage_element.getElementsByTagName(
'startscript')
1079 if start_script_elements:
1080 self.
start_script = start_script_elements[0].firstChild.data
1087 jobinfo = subprocess.Popen([
'which', self.
start_script],
1088 stdout=subprocess.PIPE,
1089 stderr=subprocess.PIPE)
1090 jobout, joberr = jobinfo.communicate()
1094 script_path = jobout.splitlines()[0].
strip()
1101 stop_script_elements = stage_element.getElementsByTagName(
'stopscript')
1102 if stop_script_elements:
1103 self.
stop_script = stop_script_elements[0].firstChild.data
1110 jobinfo = subprocess.Popen([
'which', self.
stop_script],
1111 stdout=subprocess.PIPE,
1112 stderr=subprocess.PIPE)
1113 jobout, joberr = jobinfo.communicate()
1117 script_path = jobout.splitlines()[0].
strip()