2483 def dojobsub(project, stage, makeup, recur, dryrun):
2495 tmpdir = tempfile.mkdtemp()
2499 tmpworkdir = tempfile.mkdtemp()
2505 jobsub_workdir_files_args = []
2509 input_list_name =
''
2510 if stage.inputlist !=
'':
2511 input_list_name = os.path.basename(stage.inputlist)
2512 work_list_name = os.path.join(tmpworkdir, input_list_name)
2513 if stage.inputlist != work_list_name:
2514 input_files = larbatch_posix.readlines(stage.inputlist)
2515 print(
'Making input list.')
2516 work_list =
safeopen(work_list_name)
2517 for input_file
in input_files:
2518 print(
'Adding input file %s' % input_file)
2519 work_list.write(
'%s\n' % input_file.strip())
2521 print(
'Done making input list.')
2525 fcls = project.get_fcl(stage.fclname)
2530 workfcl = os.path.join(tmpworkdir, os.path.basename(fcl))
2531 if os.path.abspath(fcl) != os.path.abspath(workfcl):
2532 larbatch_posix.copy(fcl, workfcl)
2540 wrapper_fcl_name = os.path.join(tmpworkdir,
'wrapper.fcl')
2541 wrapper_fcl =
safeopen(wrapper_fcl_name)
2543 original_project_name = project.name
2544 original_stage_name = stage.name
2545 original_project_version = project.version
2548 wrapper_fcl.write(
'#---STAGE %d\n' % stageNum)
2549 wrapper_fcl.write(
'#include "%s"\n' % os.path.basename(fcl))
2550 wrapper_fcl.write(
'\n')
2555 xml_has_metadata = project.file_type !=
'' or \
2556 project.run_type !=
''
2557 if xml_has_metadata:
2561 if project.release_tag !=
'':
2562 wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "%s"\n' % \
2563 project.release_tag)
2565 wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "test"\n')
2566 if project.file_type:
2567 wrapper_fcl.write(
'services.FileCatalogMetadata.fileType: "%s"\n' % \
2569 if project.run_type:
2570 wrapper_fcl.write(
'services.FileCatalogMetadata.runType: "%s"\n' % \
2576 if stageNum < len(stage.project_name)
and stage.project_name[stageNum] !=
'':
2577 project.name = stage.project_name[stageNum]
2578 if stageNum < len(stage.stage_name)
and stage.stage_name[stageNum] !=
'':
2579 stage.name = stage.stage_name[stageNum]
2580 if stageNum < len(stage.project_version)
and stage.project_version[stageNum] !=
'':
2581 project.version = stage.project_version[stageNum]
2582 sam_metadata = project_utilities.get_sam_metadata(project, stage)
2584 wrapper_fcl.write(sam_metadata)
2585 project.name = original_project_name
2586 stage.name = original_stage_name
2587 project.version = original_project_version
2592 if (
not stage.pubs_input
and stage.pubs_output)
or stage.output_run:
2593 wrapper_fcl.write(
'source.firstRun: %d\n' % stage.output_run)
2599 if stage.maxfluxfilemb != 0
and stageNum == 0:
2600 wrapper_fcl.write(
'physics.producers.generator.FluxCopyMethod: "IFDH"\n')
2601 wrapper_fcl.write(
'physics.producers.generator.MaxFluxFileMB: %d\n' % stage.maxfluxfilemb)
2602 wrapper_fcl.write(
'#---END_STAGE\n')
2603 stageNum = 1 + stageNum
2612 abssetupscript = project_utilities.get_setup_script_path()
2614 if not abssetupscript.startswith(
'/cvmfs/'):
2615 setupscript = os.path.join(stage.workdir,
'setup_experiment.sh')
2616 larbatch_posix.copy(abssetupscript, setupscript)
2617 jobsub_workdir_files_args.extend([
'-f', setupscript])
2622 if stage.batchname !=
'':
2623 workname = stage.batchname
2625 workname =
'%s-%s-%s' % (stage.name, project.name, project.release_tag)
2626 workname = workname + os.path.splitext(stage.script)[1]
2628 workscript = os.path.join(tmpdir, workname)
2629 if stage.script != workscript:
2630 larbatch_posix.copy(stage.script, workscript)
2634 workstartscript =
''
2636 if stage.start_script !=
'':
2637 workstartname =
'start-%s' % workname
2639 workstartscript = os.path.join(tmpdir, workstartname)
2640 if stage.start_script != workstartscript:
2641 larbatch_posix.copy(stage.start_script, workstartscript)
2647 if stage.stop_script !=
'':
2648 workstopname =
'stop-%s' % workname
2650 workstopscript = os.path.join(tmpdir, workstopname)
2651 if stage.stop_script != workstopscript:
2652 larbatch_posix.copy(stage.stop_script, workstopscript)
2656 for init_script
in stage.init_script:
2657 if init_script !=
'':
2658 if not larbatch_posix.exists(init_script):
2659 raise RuntimeError(
'Worker initialization script %s does not exist.\n' % \
2661 work_init_script = os.path.join(tmpworkdir, os.path.basename(init_script))
2662 if init_script != work_init_script:
2663 larbatch_posix.copy(init_script, work_init_script)
2667 n = len(stage.init_script)
2669 stage.init_script =
''
2671 stage.init_script = stage.init_script[0]
2676 work_init_wrapper = os.path.join(tmpworkdir,
'init_wrapper.sh')
2677 f =
open(work_init_wrapper,
'w')
2678 f.write(
'#! /bin/bash\n')
2679 for init_script
in stage.init_script:
2681 f.write(
'echo "Executing %s"\n' % os.path.basename(init_script))
2682 f.write(
'./%s\n' % os.path.basename(init_script))
2683 f.write(
'status=$?\n')
2684 f.write(
'echo "%s finished with status $status"\n' % os.path.basename(init_script))
2685 f.write(
'if [ $status -ne 0 ]; then\n')
2686 f.write(
' exit $status\n')
2689 f.write(
'echo "Done executing initialization scripts."\n')
2691 stage.init_script = work_init_wrapper
2695 for init_source
in stage.init_source:
2696 if init_source !=
'':
2697 if not larbatch_posix.exists(init_source):
2698 raise RuntimeError(
'Worker initialization source script %s does not exist.\n' % \
2700 work_init_source = os.path.join(tmpworkdir, os.path.basename(init_source))
2701 if init_source != work_init_source:
2702 larbatch_posix.copy(init_source, work_init_source)
2706 n = len(stage.init_source)
2708 stage.init_source =
''
2710 stage.init_source = stage.init_source[0]
2716 work_init_source_wrapper = os.path.join(tmpworkdir,
'init_source_wrapper.sh')
2717 f =
open(work_init_source_wrapper,
'w')
2718 for init_source
in stage.init_source:
2720 f.write(
'echo "Sourcing %s"\n' % os.path.basename(init_source))
2721 f.write(
'source %s\n' % os.path.basename(init_source))
2723 f.write(
'echo "Done sourcing initialization scripts."\n')
2725 stage.init_source = work_init_source_wrapper
2729 for end_script
in stage.end_script:
2730 if end_script !=
'':
2731 if not larbatch_posix.exists(end_script):
2732 raise RuntimeError(
'Worker end-of-job script %s does not exist.\n' % end_script)
2733 work_end_script = os.path.join(tmpworkdir, os.path.basename(end_script))
2734 if end_script != work_end_script:
2735 larbatch_posix.copy(end_script, work_end_script)
2739 n = len(stage.end_script)
2741 stage.end_script =
''
2743 stage.end_script = stage.end_script[0]
2748 work_end_wrapper = os.path.join(tmpworkdir,
'end_wrapper.sh')
2749 f =
open(work_end_wrapper,
'w')
2750 f.write(
'#! /bin/bash\n')
2751 for end_script
in stage.end_script:
2753 f.write(
'echo "Executing %s"\n' % os.path.basename(end_script))
2754 f.write(
'./%s\n' % os.path.basename(end_script))
2755 f.write(
'status=$?\n')
2756 f.write(
'echo "%s finished with status $status"\n' % os.path.basename(end_script))
2757 f.write(
'if [ $status -ne 0 ]; then\n')
2758 f.write(
' exit $status\n')
2761 f.write(
'echo "Done executing finalization scripts."\n')
2763 stage.end_script = work_end_wrapper
2767 for istage
in stage.mid_source:
2768 for mid_source
in stage.mid_source[istage]:
2769 if mid_source !=
'':
2770 if not larbatch_posix.exists(mid_source):
2771 raise RuntimeError(
'Worker midstage initialization source script %s does not exist.\n' % mid_source)
2772 work_mid_source = os.path.join(tmpworkdir, os.path.basename(mid_source))
2773 if mid_source != work_mid_source:
2774 larbatch_posix.copy(mid_source, work_mid_source)
2780 if len(stage.mid_source) > 0:
2781 work_mid_source_wrapper = os.path.join(tmpworkdir,
'mid_source_wrapper.sh')
2782 f =
open(work_mid_source_wrapper,
'w')
2783 for istage
in stage.mid_source:
2784 for mid_source
in stage.mid_source[istage]:
2785 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
2787 f.write(
' echo "Sourcing %s"\n' % os.path.basename(mid_source))
2788 f.write(
' source %s\n' % os.path.basename(mid_source))
2791 f.write(
'echo "Done sourcing midstage source initialization scripts for stage $stage."\n')
2793 stage.mid_source = work_mid_source_wrapper
2795 stage.mid_source =
''
2799 for istage
in stage.mid_script:
2800 for mid_script
in stage.mid_script[istage]:
2801 if mid_script !=
'':
2802 if not larbatch_posix.exists(mid_script):
2803 raise RuntimeError(
'Worker midstage finalization script %s does not exist.\n' % mid_script)
2804 work_mid_script = os.path.join(tmpworkdir, os.path.basename(mid_script))
2805 if mid_script != work_mid_script:
2806 larbatch_posix.copy(mid_script, work_mid_script)
2811 if len(stage.mid_script) > 0:
2812 work_mid_wrapper = os.path.join(tmpworkdir,
'mid_wrapper.sh')
2813 f =
open(work_mid_wrapper,
'w')
2814 f.write(
'#! /bin/bash\n')
2815 f.write(
'stage=$1\n')
2816 for istage
in stage.mid_script:
2817 for mid_script
in stage.mid_script[istage]:
2818 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
2820 f.write(
' echo "Executing %s"\n' % os.path.basename(mid_script))
2821 f.write(
' ./%s\n' % os.path.basename(mid_script))
2822 f.write(
' status=$?\n')
2823 f.write(
' echo "%s finished with status $status"\n' % os.path.basename(mid_script))
2824 f.write(
' if [ $status -ne 0 ]; then\n')
2825 f.write(
' exit $status\n')
2829 f.write(
'echo "Done executing midstage finalization scripts for stage $stage."\n')
2831 stage.mid_script = work_mid_wrapper
2833 stage.mid_script =
''
2837 helpers = (
'root_metadata.py',
2840 'validate_in_job.py',
2845 for helper
in helpers:
2849 jobinfo = subprocess.Popen([
'which', helper],
2850 stdout=subprocess.PIPE,
2851 stderr=subprocess.PIPE)
2852 jobout, joberr = jobinfo.communicate()
2856 helper_path = jobout.splitlines()[0].
strip()
2858 work_helper = os.path.join(tmpworkdir, helper)
2859 if helper_path != work_helper:
2860 larbatch_posix.copy(helper_path, work_helper)
2862 print(
'Helper script %s not found.' % helper)
2867 helper_modules = (
'larbatch_posix',
2868 'project_utilities',
2869 'larbatch_utilities',
2870 'experiment_utilities',
2873 for helper_module
in helper_modules:
2877 jobinfo = subprocess.Popen([
'python'],
2878 stdin=subprocess.PIPE,
2879 stdout=subprocess.PIPE,
2880 stderr=subprocess.PIPE)
2881 cmd =
'import %s\nprint(%s.__file__)\n' % (helper_module, helper_module)
2883 jobout, joberr = jobinfo.communicate()
2887 helper_path = jobout.splitlines()[-1].
strip()
2890 work_helper = os.path.join(tmpworkdir, os.path.basename(helper_path))
2891 if helper_path != work_helper:
2892 larbatch_posix.copy(helper_path, work_helper)
2894 print(
'Helper python module %s not found.' % helper_module)
2901 checked_file = os.path.join(stage.bookdir,
'checked')
2902 if not larbatch_posix.exists(checked_file):
2903 raise RuntimeError(
'Wait for any running jobs to finish and run project.py --check')
2908 bad_filename = os.path.join(stage.bookdir,
'bad.list')
2909 if larbatch_posix.exists(bad_filename):
2910 lines = larbatch_posix.readlines(bad_filename)
2912 bad_subdir = line.strip()
2913 if bad_subdir !=
'':
2914 bad_path = os.path.join(stage.outdir, bad_subdir)
2915 if larbatch_posix.exists(bad_path):
2916 print(
'Deleting %s' % bad_path)
2917 larbatch_posix.rmtree(bad_path)
2918 bad_path = os.path.join(stage.logdir, bad_subdir)
2919 if larbatch_posix.exists(bad_path):
2920 print(
'Deleting %s' % bad_path)
2921 larbatch_posix.rmtree(bad_path)
2922 bad_path = os.path.join(stage.bookdir, bad_subdir)
2923 if larbatch_posix.exists(bad_path):
2924 print(
'Deleting %s' % bad_path)
2925 larbatch_posix.rmtree(bad_path)
2932 if stage.inputdef ==
'':
2933 missing_filename = os.path.join(stage.bookdir,
'missing_files.list')
2934 if larbatch_posix.exists(missing_filename):
2935 lines = larbatch_posix.readlines(missing_filename)
2937 words = line.split()
2939 missing_files.append(words[0])
2940 makeup_count = len(missing_files)
2941 print(
'Makeup list contains %d files.' % makeup_count)
2943 if input_list_name !=
'':
2944 work_list_name = os.path.join(tmpworkdir, input_list_name)
2945 if larbatch_posix.exists(work_list_name):
2946 larbatch_posix.remove(work_list_name)
2947 work_list =
safeopen(work_list_name)
2948 for missing_file
in missing_files:
2949 work_list.write(
'%s\n' % missing_file)
2956 if stage.inputdef ==
'' and stage.inputfile ==
'' and stage.inputlist ==
'':
2957 procs = set(range(stage.num_jobs))
2962 output_files = os.path.join(stage.bookdir,
'files.list')
2963 if larbatch_posix.exists(output_files):
2964 lines = larbatch_posix.readlines(output_files)
2966 dir = os.path.basename(os.path.dirname(line))
2967 dir_parts = dir.split(
'_')
2968 if len(dir_parts) > 1:
2969 proc = int(dir_parts[1])
2972 if len(procs) != makeup_count:
2973 raise RuntimeError(
'Makeup process list has different length than makeup count.')
2978 procmap =
'procmap.txt'
2979 procmap_path = os.path.join(tmpworkdir, procmap)
2980 procmap_file =
safeopen(procmap_path)
2982 procmap_file.write(
'%d\n' % proc)
2983 procmap_file.close()
2992 cpids_filename = os.path.join(stage.bookdir,
'cpids.list')
2993 if larbatch_posix.exists(cpids_filename):
2994 cpids_files = larbatch_posix.readlines(cpids_filename)
2995 for line
in cpids_files:
2996 cpids.append(line.strip())
3002 project_utilities.test_kca()
3003 makeup_defname = samweb.makeProjectName(stage.inputdef) +
'_makeup'
3010 cpids_list = cpids_list +
'%s%s' % (sep, cpid)
3015 dim =
'(defname: %s) minus (consumer_process_id %s and consumed_status consumed)' % (stage.inputdef, cpids_list)
3019 print(
'Creating makeup sam dataset definition %s' % makeup_defname)
3020 project_utilities.test_kca()
3021 samweb.createDefinition(defname=makeup_defname, dims=dim)
3022 makeup_count = samweb.countFiles(defname=makeup_defname)
3023 print(
'Makeup dataset contains %d files.' % makeup_count)
3027 tmptar =
'%s/work.tar' % tmpworkdir
3028 jobinfo = subprocess.Popen([
'tar',
'-cf', tmptar,
'-C', tmpworkdir,
3029 '--mtime=2018-01-01',
3030 '--exclude=work.tar',
'.'],
3031 stdout=subprocess.PIPE,
3032 stderr=subprocess.PIPE)
3033 jobout, joberr = jobinfo.communicate()
3036 raise RuntimeError(
'Failed to create work tarball in %s' % tmpworkdir)
3040 hasher = hashlib.md5()
3041 f =
open(tmptar,
'rb')
3046 hash = hasher.hexdigest()
3053 hashtar =
'%s/work%s.tar' % (stage.workdir, hash)
3054 if not larbatch_posix.exists(hashtar):
3055 larbatch_posix.copy(tmptar, hashtar)
3056 jobsub_workdir_files_args.extend([
'-f', hashtar])
3063 inputdef = stage.inputdef
3064 if makeup
and makeup_defname !=
'':
3065 inputdef = makeup_defname
3072 project_utilities.test_kca()
3073 prjname = samweb.makeProjectName(inputdef)
3078 if stage.mixinputdef !=
'':
3080 project_utilities.test_kca()
3081 mixprjname =
'mix_%s' % samweb.makeProjectName(stage.mixinputdef)
3086 if prjname !=
'' and stage.prestart != 0:
3087 ok = project_utilities.start_project(inputdef, prjname,
3088 stage.num_jobs * stage.max_files_per_job,
3089 stage.recur, stage.filelistdef)
3091 print(
'Failed to start project.')
3097 if mixprjname !=
'' and prj_started:
3098 ok = project_utilities.start_project(stage.mixinputdef, mixprjname, 0, 0, stage.filelistdef)
3100 print(
'Failed to start mix project.')
3105 role = project_utilities.get_role()
3106 if project.role !=
'':
3111 command = [
'jobsub_submit']
3116 command.append(
'--group=%s' % project_utilities.get_experiment())
3117 command.append(
'--role=%s' % role)
3118 command.extend(jobsub_workdir_files_args)
3119 if project.server !=
'-' and project.server !=
'':
3120 command.append(
'--jobsub-server=%s' % project.server)
3121 if stage.resource !=
'':
3122 command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3123 elif project.resource !=
'':
3124 command.append(
'--resource-provides=usage_model=%s' % project.resource)
3125 if stage.lines !=
'':
3126 command.append(
'--lines=%s' % stage.lines)
3127 elif project.lines !=
'':
3128 command.append(
'--lines=%s' % project.lines)
3129 if stage.site !=
'':
3130 command.append(
'--site=%s' % stage.site)
3131 if stage.blacklist !=
'':
3132 command.append(
'--blacklist=%s' % stage.blacklist)
3134 command.append(
'--cpu=%d' % stage.cpu)
3135 if stage.disk !=
'':
3136 command.append(
'--disk=%s' % stage.disk)
3137 if stage.memory != 0:
3138 command.append(
'--memory=%d' % stage.memory)
3139 if project.os !=
'':
3140 if stage.singularity == 0:
3141 command.append(
'--OS=%s' % project.os)
3143 p = project_utilities.get_singularity(project.os)
3145 if (stage.num_jobs > 1
or project.force_dag)
and \
3146 (inputdef !=
'' or stage.mixinputdef !=
'') :
3147 command.append(
r"""--lines='+SingularityImage=\"%s\"'""" % p)
3149 command.append(
r"""--lines='+SingularityImage="%s"'""" % p)
3151 raise RuntimeError(
'No singularity image found for %s' % project.os)
3152 if not stage.pubs_output:
3154 command_njobs = stage.num_jobs
3155 command.extend([
'-N',
'%d' % command_njobs])
3157 command_njobs = min(makeup_count, stage.num_jobs)
3158 command.extend([
'-N',
'%d' % command_njobs])
3160 if stage.inputdef !=
'':
3161 command_njobs = stage.num_jobs
3163 command_njobs = stage.num_jobs
3164 command.extend([
'-N',
'%d' % command_njobs])
3165 if stage.jobsub !=
'':
3166 for word
in stage.jobsub.split():
3167 command.append(word)
3168 opt = project_utilities.default_jobsub_submit_options()
3170 for word
in opt.split():
3171 command.append(word)
3172 if stage.cvmfs != 0:
3173 command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3174 if stage.stash != 0:
3175 command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3176 if stage.singularity != 0:
3177 command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3181 workurl =
"file://%s" % workscript
3182 command.append(workurl)
3187 if stage.max_files_per_job != 0:
3188 command_max_files_per_job = stage.max_files_per_job
3189 command.extend([
'--nfile',
'%d' % command_max_files_per_job])
3194 command.extend([
' --group', project_utilities.get_experiment()])
3195 command.extend([
' -g'])
3196 command.extend([
' -c',
'wrapper.fcl'])
3197 command.extend([
' --ups',
','.
join(project.ups)])
3198 if project.release_tag !=
'':
3199 command.extend([
' -r', project.release_tag])
3200 command.extend([
' -b', project.release_qual])
3201 if project.local_release_dir !=
'':
3202 command.extend([
' --localdir', project.local_release_dir])
3203 if project.local_release_tar !=
'':
3204 command.extend([
' --localtar', project.local_release_tar])
3205 command.extend([
' --workdir', stage.workdir])
3206 command.extend([
' --outdir', stage.outdir])
3207 command.extend([
' --logdir', stage.logdir])
3208 if stage.dirsize > 0:
3209 command.extend([
' --dirsize',
'%d' % stage.dirsize])
3210 if stage.dirlevels > 0:
3211 command.extend([
' --dirlevels',
'%d' % stage.dirlevels])
3214 command.extend([
' --exe',
':'.
join(stage.exe)])
3216 command.extend([
' --exe', stage.exe])
3217 if stage.schema !=
'':
3218 command.extend([
' --sam_schema', stage.schema])
3219 if project.os !=
'':
3220 command.extend([
' --os', project.os])
3224 if not stage.pubs_input
and stage.pubs_output
and stage.output_subruns[0] > 0:
3225 command.extend([
'--process',
'%d' % (stage.output_subruns[0]-1)])
3230 command.append(
'--single')
3232 if stage.inputfile !=
'':
3233 command.extend([
' -s', stage.inputfile])
3234 elif input_list_name !=
'':
3235 command.extend([
' -S', input_list_name])
3236 elif inputdef !=
'':
3237 command.extend([
' --sam_defname', inputdef,
3238 ' --sam_project', prjname])
3240 command.extend([
' --recur'])
3241 if stage.mixinputdef !=
'':
3242 command.extend([
' --mix_defname', stage.mixinputdef,
3243 ' --mix_project', mixprjname])
3244 if stage.inputmode !=
'':
3245 command.extend([
' --inputmode', stage.inputmode])
3246 command.extend([
' -n',
'%d' % stage.num_events])
3247 if stage.inputdef ==
'':
3248 command.extend([
' --njobs',
'%d' % stage.num_jobs ])
3249 for ftype
in stage.datafiletypes:
3250 command.extend([
'--data_file_type', ftype])
3252 command.extend([
' --procmap', procmap])
3255 command.extend([
' --output',
':'.
join(stage.output)])
3257 command.extend([
' --output', stage.output])
3258 if stage.TFileName !=
'':
3259 command.extend([
' --TFileName', stage.TFileName])
3260 if stage.init_script !=
'':
3261 command.extend([
' --init-script', os.path.basename(stage.init_script)])
3262 if stage.init_source !=
'':
3263 command.extend([
' --init-source', os.path.basename(stage.init_source)])
3264 if stage.end_script !=
'':
3265 command.extend([
' --end-script', os.path.basename(stage.end_script)])
3266 if stage.mid_source !=
'':
3267 command.extend([
' --mid-source', os.path.basename(stage.mid_source)])
3268 if stage.mid_script !=
'':
3269 command.extend([
' --mid-script', os.path.basename(stage.mid_script)])
3270 if abssetupscript !=
'':
3271 command.extend([
' --init', abssetupscript])
3275 if stage.validate_on_worker == 1:
3276 print(
'Validation will be done on the worker node %d' % stage.validate_on_worker)
3277 command.extend([
' --validate'])
3278 command.extend([
' --declare'])
3280 if type(stage.fclname) ==
type([])
and len(stage.fclname) > 1:
3281 command.extend([
' --maintain_parentage'])
3283 if stage.copy_to_fts == 1:
3284 command.extend([
' --copy'])
3288 if (prjname !=
'' or mixprjname !=
'')
and command_njobs == 1
and not project.force_dag
and not prj_started:
3289 command.extend([
' --sam_start',
3290 ' --sam_station', project_utilities.get_experiment(),
3291 ' --sam_group', project_utilities.get_experiment()])
3300 if command_njobs > 1
or project.force_dag:
3302 dag_prjs.append([inputdef, prjname])
3303 if stage.mixinputdef !=
'':
3304 dag_prjs.append([stage.mixinputdef, mixprjname])
3306 for dag_prj
in dag_prjs:
3311 if workstartname ==
'' or workstopname ==
'':
3312 raise RuntimeError(
'Sam start or stop project script not found.')
3316 start_command = [
'jobsub']
3320 start_command.append(
'--group=%s' % project_utilities.get_experiment())
3321 if setupscript !=
'':
3322 start_command.append(
'-f %s' % setupscript)
3324 if stage.resource !=
'':
3325 start_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3326 elif project.resource !=
'':
3327 start_command.append(
'--resource-provides=usage_model=%s' % project.resource)
3328 if stage.lines !=
'':
3329 start_command.append(
'--lines=%s' % stage.lines)
3330 elif project.lines !=
'':
3331 start_command.append(
'--lines=%s' % project.lines)
3332 if stage.site !=
'':
3333 start_command.append(
'--site=%s' % stage.site)
3334 if stage.blacklist !=
'':
3335 start_command.append(
'--blacklist=%s' % stage.blacklist)
3336 if project.os !=
'':
3337 if stage.singularity == 0:
3338 start_command.append(
'--OS=%s' % project.os)
3340 p = project_utilities.get_singularity(project.os)
3342 start_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3344 raise RuntimeError(
'No singularity image found for %s' % project.os)
3345 if stage.jobsub_start !=
'':
3346 for word
in stage.jobsub_start.split():
3347 start_command.append(word)
3348 opt = project_utilities.default_jobsub_submit_options()
3350 for word
in opt.split():
3351 start_command.append(word)
3352 if stage.cvmfs != 0:
3353 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3354 if stage.stash != 0:
3355 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3356 if stage.singularity != 0:
3357 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3361 workstarturl =
"file://%s" % workstartscript
3362 start_command.append(workstarturl)
3366 start_command.extend([
' --sam_station', project_utilities.get_experiment(),
3367 ' --sam_group', project_utilities.get_experiment(),
3368 ' --sam_defname', dag_prj[0],
3369 ' --sam_project', dag_prj[1],
3372 start_command.extend([
' --recur'])
3374 if abssetupscript !=
'':
3375 start_command.extend([
' --init', abssetupscript])
3377 if stage.num_jobs > 0
and stage.max_files_per_job > 0:
3378 start_command.extend([
' --max_files',
'%d' % (stage.num_jobs * stage.max_files_per_job)])
3380 if stage.prestagefraction > 0.:
3381 start_command.extend([
' --prestage_fraction',
'%f' % stage.prestagefraction])
3385 start_command.extend([
' --logdir', stage.logdir])
3389 if not prj_started
or stage.prestagefraction > 0.:
3390 start_commands.append(start_command)
3394 stop_command = [
'jobsub']
3398 stop_command.append(
'--group=%s' % project_utilities.get_experiment())
3399 if setupscript !=
'':
3400 stop_command.append(
'-f %s' % setupscript)
3402 if stage.resource !=
'':
3403 stop_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3404 elif project.resource !=
'':
3405 stop_command.append(
'--resource-provides=usage_model=%s' % project.resource)
3406 if stage.lines !=
'':
3407 stop_command.append(
'--lines=%s' % stage.lines)
3408 elif project.lines !=
'':
3409 stop_command.append(
'--lines=%s' % project.lines)
3410 if stage.site !=
'':
3411 stop_command.append(
'--site=%s' % stage.site)
3412 if stage.blacklist !=
'':
3413 stop_command.append(
'--blacklist=%s' % stage.blacklist)
3414 if project.os !=
'':
3415 if stage.singularity == 0:
3416 stop_command.append(
'--OS=%s' % project.os)
3418 p = project_utilities.get_singularity(project.os)
3420 stop_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3422 raise RuntimeError(
'No singularity image found for %s' % project.os)
3423 if stage.jobsub_start !=
'':
3424 for word
in stage.jobsub_start.split():
3425 stop_command.append(word)
3426 opt = project_utilities.default_jobsub_submit_options()
3428 for word
in opt.split():
3429 stop_command.append(word)
3430 if stage.cvmfs != 0:
3431 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3432 if stage.stash != 0:
3433 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3434 if stage.singularity != 0:
3435 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3439 workstopurl =
"file://%s" % workstopscript
3440 stop_command.append(workstopurl)
3444 stop_command.extend([
' --sam_station', project_utilities.get_experiment(),
3445 ' --sam_project', dag_prj[1],
3450 stop_command.extend([
' --logdir', stage.logdir])
3452 if abssetupscript !=
'':
3453 stop_command.extend([
' --init', abssetupscript])
3457 stop_commands.append(stop_command)
3459 if len(start_commands) > 0
or len(stop_commands) > 0:
3463 dagfilepath = os.path.join(tmpdir,
'submit.dag')
3465 dag.write(
'<serial>\n')
3469 if len(start_commands) > 0:
3470 dag.write(
'\n<parallel>\n\n')
3471 for start_command
in start_commands:
3473 for word
in start_command:
3477 if word[:6] ==
'jobsub':
3481 dag.write(
'</parallel>\n')
3485 dag.write(
'\n<parallel>\n\n')
3486 for process
in range(command_njobs):
3490 for word
in command:
3500 if word[:6] ==
'jobsub':
3502 if word[:7] ==
'--role=':
3504 if word.startswith(
'--jobsub-server='):
3506 word = project_utilities.dollar_escape(word)
3508 if word[:6] ==
'jobsub':
3511 dag.write(
' --process %d\n' % process)
3513 dag.write(
'\n</parallel>\n')
3517 if len(stop_commands) > 0:
3518 dag.write(
'\n<parallel>\n\n')
3519 for stop_command
in stop_commands:
3521 for word
in stop_command:
3525 if word[:6] ==
'jobsub':
3529 dag.write(
'</parallel>\n')
3533 dag.write(
'\n</serial>\n')
3538 command = [
'jobsub_submit_dag']
3539 command.append(
'--group=%s' % project_utilities.get_experiment())
3540 if project.server !=
'-' and project.server !=
'':
3541 command.append(
'--jobsub-server=%s' % project.server)
3542 command.append(
'--role=%s' % role)
3543 dagfileurl =
'file://'+ dagfilepath
3544 command.append(dagfileurl)
3546 checked_file = os.path.join(stage.bookdir,
'checked')
3550 submit_timeout = 3600000
3552 submit_timeout += 1.0 * command_njobs
3553 if stage.jobsub_timeout > submit_timeout:
3554 submit_timeout = stage.jobsub_timeout
3562 print(
'Invoke jobsub_submit')
3567 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3568 thread = threading.Thread(target=project_utilities.wait_for_subprocess, args=[jobinfo, q])
3570 thread.join(timeout=submit_timeout)
3571 if thread.is_alive():
3577 if larbatch_posix.exists(checked_file):
3578 larbatch_posix.remove(checked_file)
3579 if larbatch_posix.isdir(tmpdir):
3580 larbatch_posix.rmtree(tmpdir)
3581 if larbatch_posix.isdir(tmpworkdir):
3582 larbatch_posix.rmtree(tmpworkdir)
3584 raise JobsubError(command, rc, jobout, joberr)
3585 for line
in jobout.split(
'\n'):
3586 if "JobsubJobId" in line:
3587 jobid = line.strip().split()[-1]
3589 raise JobsubError(command, rc, jobout, joberr)
3590 print(
'jobsub_submit finished.')
3596 if makeup_count > 0:
3601 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3602 thread = threading.Thread(target=project_utilities.wait_for_subprocess,
3605 thread.join(timeout=submit_timeout)
3606 if thread.is_alive():
3612 if larbatch_posix.exists(checked_file):
3613 larbatch_posix.remove(checked_file)
3614 if larbatch_posix.isdir(tmpdir):
3615 larbatch_posix.rmtree(tmpdir)
3616 if larbatch_posix.isdir(tmpworkdir):
3617 larbatch_posix.rmtree(tmpworkdir)
3619 raise JobsubError(command, rc, jobout, joberr)
3620 for line
in jobout.split(
'\n'):
3621 if "JobsubJobId" in line:
3622 jobid = line.strip().split()[-1]
3624 raise JobsubError(command, rc, jobout, joberr)
3626 print(
'Makeup action aborted because makeup job count is zero.')