All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
project.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 ######################################################################
3 #
4 # Name: project.py
5 #
6 # Purpose: Production project script.
7 #
8 # Created: 11-Sep-2012 Herbert Greenlee
9 #
10 # Usage:
11 #
12 # project.py <options>
13 #
14 # Project options:
15 #
16 # --xml <-|file|url> - Xml file containing project description.
17 # --project <project> - Project name (required if xml file contains
18 # more than one project description).
19 # --stage <stage>[,<stage2>,...] - Project stage(s) (required if project
20 # contains more than one stage).
21 # --tmpdir <tempdir> - Override TMPDIR internally. If TMPDIR is set
22 # use ifdh cp instead of xrootd for accessing
23 # content of root files in dCache.
24 #
25 # XML stage configuration overrides. Note that most of these options, except --inputdef,
26 # are passed directly to jobsub_submit. Refer to jobsub_submit documentation for
27 # additional details.
28 #
29 # --lines <arg> - Override stage element <lines>...</lines>.
30 # --site <site> - Override stage element <site>...</site> (comma-separated list).
31 # --cpu <ncpus> - Override stage element <cpu>...</cpu> (integer # of cpus).
32 # --disk <disk> - Override stage element <disk>...</disk> (value and unit).
33 # --memory <mem> - Override stage element <memory>...</memory> (MB).
34 # --inputdef <defname> - Override stage element <inputdef>...</inputdef>. Any existing
35 # element of type <inputfile> or <inputlist> is nullified.
36 #
37 # Pubs options (combine with any action option).
38 #
39 # --pubs <run> <subrun> [<version>] - Modifies selected stage to specify pubs mode.
40 # The <subrun> can be a range or comma-separated
41 # list of subruns.
42 #
43 # Actions (specify one):
44 #
45 # [-h|--help] - Print help (this message).
46 # [-xh|--xmlhelp] - Print xml help.
47 # --submit - Submit all jobs for specified stage.
48 # --recur - Input dataset is recursive. Used in conjunction with --submit,
49 # allows job submission even if output directories are not empty.
50 # Also forces a new snapshot in case of input from sam.
51 # --check - Check results for specified stage and print message.
52 # --checkana - Check analysis results for specified stage and print message.
53 # --shorten - Shorten root filenames to have fewer than than 200 characters.
54 # --fetchlog - Fetch jobsub logfiles (jobsub_fetchlog).
55 # --mergehist - merge histogram files using hadd -T
56 # --mergentuple- merge ntuple files using hadd
57 # --merge - merge non-ART root files using the specified merging program in the XML file
58 # (default hadd -T)
59 # --status - Print status of each stage.
60 # --makeup - Submit makeup jobs for specified stage.
61 # --clean - Delete output from specified project and stage and following stages.
62 # --clean_one - Delete output from specified project and stage (don't clean following stages).
63 # --declare - Declare files to sam.
64 # --add_locations - Check sam disk locations and add missing ones.
65 # --clean_locations - Check sam disk locations and remove non-existent ones.
66 # --remove_locations - Remove all sam disk locations, whether or not file exists.
67 # --upload - Upload files to enstore.
68 # --define - Make sam dataset definition.
69 # --undefine - Delete sam dataset definition.
70 # --audit - compare input files to output files and look for extra
71 # or misssing files and take subsequent action
72 #
73 # --declare_ana - Declare analysis files to sam.
74 # --add_locations_ana - Check sam analysis file disk locations and add missing ones.
75 # --clean_locations_ana - Check analysis file sam disk locations and remove non-existent ones.
76 # --remove_locations_ana - Remove all analysis sam disk locations, whether or not file exists.
77 # --upload_ana - Upload analysis files to enstore.
78 # --define_ana - Make sam dataset definition for analysis files.
79 #
80 # Information only actions:
81 #
82 # --dump_project - Dump project object (dumps all stages).
83 # --dump_stage - Dump stage object.
84 # --dryrun - When combined with --submit or --makeup, do prep and show submit command,
85 # but don't submit jobs.
86 # --nocheck - Parse xml with reduced checks. This is mainly useful when
87 # combined with one of the dump options.
88 #
89 # --outdir - Print the name of the output directory for stage.
90 # --logdir - Print the name of the log directory for stage.
91 # --workdir - Print the name of the work directory for stage.
92 # --bookdir - Print the name of the bookkeeping directory for stage.
93 # --fcl - Print the fcl file name and version for stage.
94 # --defname - Print sam dataset definition name for stage.
95 # --input_files - Print all input files.
96 # --check_submit - Run presubmission check script, if any.
97 # --check_input - Do all standard input file checks.
98 #
99 # --check_declarations - Check whether data files are declared to sam.
100 # --test_declarations - Print a summary of files returned by sam query.
101 # --check_locations - Check sam locations and report the following:
102 # a) Files that lack any location.
103 # b) Disk locations that can be added.
104 # c) Incorrect disk locations that should be removed.
105 # --check_tape - Check sam tape locations.
106 # Reports any files that lack tape (enstore) locations.
107 # --check_definition - Reports whether the sam dataset definition associated
108 # with this project/stage exists, or needs to be created.
109 # --test_definition - Print a summary of files returned by dataset definition.
110 #
111 # --check_declarations_ana - Check whether analysis files are declared to sam.
112 # --test_declarations_ana - Print a summary of analysis files returned by sam query.
113 # --check_locations_ana - Check sam locations for analysis files and report the
114 # following:
115 # a) Files that lack any location.
116 # b) Disk locations that can be added.
117 # c) Incorrect disk locations that should be removed.
118 # --check_tape_ana - Check analysis file sam tape locations.
119 # Reports any files that lack tape (enstore) locations.
120 # --check_definition_ana - Reports whether the sam analysis dataset definition
121 # associated with this project/stage exists, or needs to
122 # be created.
123 # --test_definition_ana - Print a summary of files returned by analysis dataset
124 # definition.
125 #
126 ######################################################################
127 #
128 # XML file structure
129 # ------------------
130 #
131 # The xml file must contain one or more elements with tag "project."
132 #
133 # The project element must have attribute "name."
134 #
135 # The following element tags withing the project element are recognized.
136 #
137 # <numevents> - Total number of events (required).
138 # <numjobs> - Number of worker jobs (default 1). This value can be
139 # overridden for individual stages by <stage><numjobs>.
140 # <maxfilesperjob> - Maximum number of files to deliver to a single job
141 # Useful in case you want to limit output file size or keep
142 # 1 -> 1 correlation between input and output. can be overwritten
143 # by <stage><maxfilesperjob>
144 # <ups> - Override top level ups products (repeatable).
145 # <os> - Specify batch OS (comma-separated list: SL5,SL6).
146 # Default let jobsub decide.
147 #
148 # If the singularity flag is false, this option is passed directly as
149 # jobsub_submit option --OS.
150 #
151 # If singularity flag is true, this option is used to specify the
152 # singularity image, passed via jobsub_submit --lines.
153 #
154 # A singularity image can be specified as an absolute or relative path
155 # of the image file, or as an alias, such as sl6, sl7, el8.
156 # A singularity image alias can be upper or lower case.
157 # The alias selects an image file in
158 # directory /cvmfs/singularity.opensciencegrid.org/fermilab.
159 #
160 # <server> - Jobsub server (expert option, jobsub_submit --jobsub-server=...).
161 # If "" (blank), "-" (hyphen), or missing, omit --jobsub-server
162 # option (use default server).
163 # <resource> - Jobsub resources (comma-separated list: DEDICATED,OPPORTUNISTIC,
164 # OFFSITE,FERMICLOUD,PAID_CLOUD,FERMICLOUD8G).
165 # Default: DEDICATED,OPPORTUNISTIC.
166 # <role> - Role (normally Analysis or Production). This element overrides the
167 # default role-determining algorithm in larbatch_utilities.get_role().
168 # <lines> - Arbitrary condor commands (expert option, jobsub_submit --lines=...).
169 # <site> - Specify sites (comma-separated list, default jobsub decides).
170 # <blacklist> - Blacklist sites (comma-separated list, default jobsub decides).
171 #
172 # <cpu> - Number of cpus (jobsub_submit --cpu=...).
173 # <disk> - Amount of scratch disk space (jobsub_submit --disk=...).
174 # Specify value and unit (e.g. 50GB).
175 # <memory> - Specify amount of memory in MB (jobsub_submit --memory=...).
176 #
177 # <script> - Name of batch worker script (default condor_lar.sh).
178 # The batch script must be on the execution path.
179 # <startscript> - Name of batch worker start project script (default condor_start_project.sh)
180 # Must be on execution path.
181 # <stopscript> - Name of batch worker stop project script (default condor_start_project.sh)
182 # Must be on execution path.
183 #
184 #
185 # <larsoft> - Information about larsoft release.
186 # <larsoft><tag> - Frozen release tag (default "development").
187 # <larsoft><qual> - Build qualifier (default "debug", or "prof").
188 # <larsoft><local> - Local test release directory or tarball (default none).
189 # <version> - Specify project version (default same as <larsoft><tag>).
190 #
191 # <filetype> - Sam file type ("data" or "mc", default none).
192 # <runtype> - Sam run type (normally "physics", default none).
193 # <runnumber> - Sam run number (default nont).
194 # <parameter name="parametername"> - Specify experiment-specific metadata parameters
195 #
196 # <merge> - special histogram merging program (default "hadd -T",
197 # can be overridden at each stage).
198 # Set to "1" to generate merging metadata for artroot files.
199 # <anamerge> - Set to "1" to generate merging metadata for analysis files.
200 #
201 # <check> - Do on-node validation and sam declaration (0 or 1, default 0).
202 # <copy> - Copy validated root files to FTS (0 or 1, default 0).
203 # <cvmfs> - Cvmfs flag (0 or 1, default 1). If nonzero, add option
204 # "--append_condor_requirements='(TARGET.HAS_CVMFS_<experiment>_opensciencegrid_org==true)'"
205 # <stash> - Stash cache flag (0 or 1, default 1). If nonzero, add option
206 # "--append_condor_requirements='(TARGET.HAS_CVMFS_<experiment>_osgstorage_org==true)'"
207 # <singularity> - Singularity flag (0 or 1, default 1).
208 #
209 # <stage name="stagename" base="basestage"> - Information about project stage.
210 # There can be multiple instances of this tag with different name
211 # attributes. The name attribute is optional if there is
212 # only one project stage. The base attribute is also optional.
213 # If present, it specifies a "base stage" which supplies default
214 # values for all unspecified xml tags.
215 # <stage><batchname> - If present and not empty, override default batch job name.
216 # <stage><fcl> - Name of fcl file (required).
217 # Search $FHICL_FILE_PATH, <fcldir>, or specify full path.
218 # Repeatable.
219 # See below for additional information about multiple fcls (substages).
220 # <stage><outdir> - Output directory (required). A subdirectory with the
221 # project name is created underneath this directory. Individual
222 # batch workers create an additional subdirectory under that with
223 # names like <cluster>_<process>. Output data (root) files
224 # generated by batch jobs are stored in this directory. This
225 # directory should be grid-accessible.
226 # <stage><logdir> - Log directory (optional). If not specified, default to
227 # be the same as the output directory. A directory structure
228 # is created under the log directory similar to the one
229 # under the output directory. Non-data (non-root, usually small)
230 # files generated by batch jobs are stored in this directory.
231 # This directory should be grid-accessible.
232 # <stage><workdir> - Work directory (required). This directory acts as the
233 # submission directory for the batch job. Fcl file, batch
234 # script, and input file list are copied here. A subdirectory with
235 # the name of the project and "/work" are appended to this path.
236 # This directory should be grid-accessible.
237 # <stage><bookdir> - Bookkeeping directory (optional). If not specified, default
238 # to be the same as the log directory. A directory
239 # structure is created under the bookkeeping directory similar
240 # to the one under the output directory. This directory is used
241 # to store bookkeeping files generated by this script. It does not
242 # need to be grid-accessible. Ideally the bookkeeping directory
243 # should be on a local disk.
244 # <stage><dirsize> - Specify maximum directory size. No effect unless <dirlevels>
245 # is greater than zero.
246 # <stage><dirlevels> - Specify number of extra directory levels (default 0).
247 # <stage><inputfile> - Specify a single input file (full path). The number
248 # of batch jobs must be one.
249 # <stage><inputlist> - Specify input file list (a file containing a list
250 # of input files, one per line, full path).
251 # <stage><inputmode> - Specify input file tyle. Default is none which means
252 # art root file. Alternative is textfile
253 # <stage><inputdef> - Specify input sam dataset definition.
254 #
255 # It is optional to specify an input file or input list (Monte
256 # Carlo generaiton doesn't need it, obviously). It is also
257 # optional for later production stages. If no input is specified,
258 # the list of files produced by the previous production stage
259 # (if any) will be used as input to the current production stage
260 # (must have been checked using option --check).
261 # <stage><inputstream> - Specify input stream. This only effect of this
262 # parameter is to change the default input file list name from
263 # "files.list" to "files_<inputstream>.list." This parameter has
264 # no effect if any non-default input is specified.
265 # <stage><previousstage> - Specify the previous stage name to be something other
266 # than the immediate predecessor stage specified in the xml file.
267 # This parameter only affects the default input file list. This
268 # parameter has no effect if any non-default input is specified.
269 # Specify as "none" (or any nonexistent stage) to prevent generation
270 # of any default input (i.e. for noninitial generator stages).
271 # <stage><filelistdef> - Evaluate input sam definition using separated queries
272 # (may reduce load on sam database).
273 # <stage><mixinputdef> - Specify mix input from a sam dataset.
274 # <stage><pubsinput> - 0 (false) or 1 (true). If true, modify input file list
275 # for specific (run, subrun, version) in pubs mode. Default is true.
276 # <stage><maxfluxfilemb> - Specify GENIEHelper fcl parameter MaxFluxFileMB (default 500).
277 # Specifying this parameter as 0 will inhibit genie flux fcl
278 # overrides, which may be useful for non-genie generators.
279 #
280 #
281 # <stage><recur> - Recursive flag (0 or 1). Same as command line option --recur.
282 # <stage><recurdef> - Specify recursive (aka draining) input dataset name. Can be
283 # a predefined dataset definition or project.py can define it
284 # for you.
285 #
286 # The dataset specified by <recurdef> is used as input in preference
287 # to <inputdef> (if specified).
288 #
289 # This element also implicitly sets the recursive flag.
290 #
291 # If you want project.py to create a recursive dataset definition
292 # for you, specify both <recurdef> and <inputdef>. Then
293 # project.py will create a dataset definition (if one doesn't exist)
294 # using <intputdef> as base, and adding optional "minus" and/or
295 # "with limit" clause in the sam dimension.
296 #
297 # <stage><recurtype> - Specify the type of minus clause to use in a an automatically
298 # generated recursive dataset definition. If this element is
299 # missing, the generated dataset definition will not include
300 # a minus clause.
301 #
302 # Allowed values are:
303 #
304 # none - Don't generate a minus clause.
305 # snapshot - "minus snapshot_for_project_name ...".
306 # consumed - "minus (project_name ... and consumed_status consumed)"
307 # child - "minus isparentof: ( ... )" using artroot data tier.
308 # anachild - "minus isparentof: ( ... )" using analysis data tier.
309 #
310 # <stage><recurlimit> - Specify an integer value for "with limit" clause. If this
311 # element is missing or the value is zero, the generated dataset
312 # definition will not include a "with limit" clause.
313 #
314 # <stage><singlerun> - Single run flag. If nonzero, limit input to come from a single
315 # run. The run is based on a randomly selected file.
316 #
317 # <stage><prestart> - Prestart flag. If specified and nonzero, start the sam project
318 # in this script, instead of in a batch job.
319 #
320 # <stage><activebase> - If this element is present and nonempty, define or update an
321 # active projects dataset "<activebase>_active,"
322 # where <activebase> is the value of this element.
323 # Do this in the input checking phase (e.g. prior to job submission)
324 # in function stagedef.checkinput.
325 #
326 # <stage><dropboxwait> - If this element is present as well as <activebase>, specify a
327 # dropbox waiting interval. Specify as floating point days.
328 # Create dataset "<activebase>_wait."
329 #
330 # <stage><prestagefraction> - This parameter should be a floating point number between
331 # 0 and 1 (default 0). If nonzero, the separate batch job that
332 # starts the sam project (if any) will prestage at least the
333 # specified fraction of files from the input sam project before
334 # exiting.
335 #
336 # <stage><ana> - Analysis flag (0 or 1, default 0). Setting this flag to 1
337 # informs project.py that this stage does not contain a RootOutput
338 # module, and not to expect any artroot output file. This flag
339 # effectively converts command line action options to the
340 # analysis equivalent (e.g. --check acts like --checkana).
341 #
342 # <stage><numjobs> - Number of worker jobs (default 1).
343 # <stage><numevents> - Number of events (override project level number of events).
344 # <stage><maxfilesperjob> - Maximum number of files to deliver to a single job
345 # Useful in case you want to limit output file size or keep
346 # 1 -> 1 correlation between input and output
347 # <stage><targetsize> - Specify target size for input files. If specified,
348 # this attribute may override <numjobs> in the downward
349 # direction (i.e. <numjobs> is the maximum number of jobs).
350 # <stage><defname> - Sam output dataset defition name (default none).
351 # <stage><anadefname> - Sam analysis output dataset defition name (default none).
352 # <stage><datatier> - Sam data tier (default none).
353 # <stage><datastream> - Sam data stream (default none).
354 # <stage><anadatatier> - Sam analysis data tier (default none).
355 # <stage><anadatastream> - Sam analysis data stream (default none).
356 # <stage><submitscript> - Presubmission check script. Must be on execution path.
357 # If this script exits with nonzero exit status, job submission
358 # is aborted.
359 # <stage><initscript> - Worker initialization script (condor_lar.sh --init-script). Repeatable.
360 # <stage><initsource> - Worker initialization bash source script (condor_lar.sh --init-source).
361 # <stage><endscript> - Worker finalization script (condor_lar.sh --end-script). Repeatable.
362 # <stage><merge> - Name of special histogram merging program or script (default "hadd -T",
363 # can be overridden at each stage).
364 # Set to "1" to generate merging metadata for artroot files.
365 # <stage><anamerge> - Set to "1" to generate merging metadata for analysis files.
366 # <stage><resource> - Jobsub resources (comma-separated list: DEDICATED,OPPORTUNISTIC,
367 # OFFSITE,FERMICLOUD,PAID_CLOUD,FERMICLOUD8G).
368 # Default: DEDICATED,OPPORTUNISTIC.
369 # <stage><lines> - Arbitrary condor commands (expert option, jobsub_submit --lines=...).
370 # <stage><site> - Specify sites (default jobsub decides).
371 # <stage><blacklist> - Blacklist sites (default jobsub decides).
372 # <stage><cpu> - Number of cpus (jobsub_submit --cpu=...).
373 # <stage><disk> - Amount of scratch disk space (jobsub_submit --disk=...).
374 # Specify value and unit (e.g. 50GB).
375 # <stage><memory> - Specify amount of memory in MB (jobsub_submit --memory=...).
376 # <stage><script> - Name of batch worker script (default condor_lar.sh).
377 # The batch script must be on the execution path.
378 # <stage><startscript> - Name of batch worker start project script (default condor_start_project.sh)
379 # Must be on execution path.
380 # <stage><stopscript> - Name of batch worker stop project script (default condor_start_project.sh)
381 # Must be on execution path.
382 # <stage><output> - Specify output file name. Can aslso appear in fcl substages (see below).
383 # <stage><datafiletypes> - Specify file types that should be considered as data and
384 # saved in batch jobs (comma-separated list). Default "root".
385 # <stage><TFileName> - Ability to specify unique output TFile Name
386 # (Required when generating Metadata for TFiles)
387 # <stage><jobsub> - Arbitrary jobsub_submit option(s). Space-separated list.
388 # Only applies to main worker submission, not sam start/stop
389 # project submissions.
390 # <stage><jobsub_start> - Arbitrary jobsub_submit option(s). Space-separated list.
391 # Applies to sam start/stop project submissions.
392 # <stage><jobsub_timeout> - Jobsubmission timeout (seconds).
393 # <stage><maxfilesperjob> - Maximum number of files to be processed in a single worker.
394 # <stage><exe> - Executable (default "lar"). Can also appear in fcl substages (see below).
395 # <stage><schema> - Sam schema (default none). Use "root" to stream using xrootd.
396 # <stage><check> - Do on-node validation and sam declaration (0 or 1, default 0).
397 # <stage><copy> - Copy validated root files to FTS (0 or 1, default 0).
398 # <stage><cvmfs> - Cvmfs flag (0 or 1, default 1). If nonzero, add option
399 # "--append_condor_requirements='(TARGET.HAS_CVMFS_<experiment>_opensciencegrid_org==true)'"
400 # <stage><stash> - Stash cache flag (0 or 1, default 1). If nonzero, add option
401 # "--append_condor_requirements='(TARGET.HAS_CVMFS_<experiment>_osgstorage_org==true)'"
402 # <stage><singularity> - Singularity flag (0 or 1, default 1).
403 #
404 # Batch job substages.
405 #
406 # Batch jobs can have multiple substages. The number of substages equals the number
407 # of <fcl> elements. Each <fcl> element triggers the execution of a different executable
408 # within a single batch job. Some aspects of the environment are tunable within each
409 # substage by specifying additional subelements within each <fcl> slement.
410 #
411 # <stage><fcl> - Name of fcl file. This should come first within each <fcl> element
412 # before additional substage subelements.
413 # <stage><fcl><initsource> - Initialization source script for this substage.
414 # <stage><fcl><endstage> - Finalization script for this substage.
415 # <stage><fcl><exe> - Executable to use in this substage (default "lar").
416 # <stage><fcl><output> - Output file name for this substage.
417 # <stage><fcl><projectname> - Override project name for this substage.
418 # <stage><fcl><stagename> - Override stage name for this substage.
419 # <stage><fcl><version> - Override project version for this substage.
420 #
421 #
422 # <fcldir> - Directory in which to search for fcl files (optional, repeatable).
423 # Fcl files are searched for in the following directories, in order.
424 # 1. Fcl directories specified using <fcldir>.
425 # 2. $FHICL_FILE_PATH.
426 # Regardless of where an fcl file is found, a copy is placed
427 # in the work directory before job submission. It is an
428 # error of the fcl file isn't found.
429 #
430 ######################################################################
431 #
432 # Bookkeeping Files
433 # -----------------
434 #
435 # Project.py uses bookkeeping files that it manages to record the state
436 # of project jobs. These bookkeeping files are stored in the log
437 # directoroy of each project stage (XML element <logdir>). Most of these
438 # files are gnerated or updated after a "check" operation (project.py --check).
439 #
440 # Here is a list of all bookkeeping files used by project.py:
441 #
442 # checked - A file with empty contents used to record the timestamp
443 # of the latest check for this stage.
444 # files.list - A list of all good art output files (full paths), one file
445 # per line.
446 # file_<data_stream>.list - A list of all goot art output files in the
447 # specified stream.
448 # events.list - A list of all good art output files (full paths), one
449 # file per line, plus on the same line the number of events.
450 # filesana.list - A list of all good non-art output root files (full
451 # paths), one file per line.
452 # transferred_uris.list - A list of input files that were successfully
453 # processed. This file is a concatenated version of
454 # "transferred_uris.list" files from successful
455 # process subdirectories.
456 # missing_files.list - A list of input files that were not successfully
457 # processeed.
458 # bad.list - A list of failed process subdirectories.
459 # sam_projects.list - A list of at sam projects from successful processes.
460 # This file is a concatenated and sorted version of
461 # "sam_project.txt" files from successful process
462 # subdirectories.
463 # cpids.list - A list of successful sam consumer process ids. This file
464 # is a concatenated version of "cpid.txt" files from
465 # successful process subdirectories.
466 #
467 # jobids.list - A list of all submitted jobsub jobids.
468 #
469 ######################################################################
470 
471 from __future__ import absolute_import
472 from __future__ import print_function
473 import sys, os, stat, subprocess, shutil, json, getpass, uuid, tempfile, hashlib
474 try:
475  import urllib.request as urlrequest
476 except ImportError:
477  import urllib as urlrequest
478 import larbatch_posix
479 import threading
480 try:
481  import queue
482 except ImportError:
483  import Queue as queue
484 from xml.dom.minidom import parse
485 import project_utilities, root_metadata
486 from project_modules.projectdef import ProjectDef
487 from project_modules.projectstatus import ProjectStatus
488 from project_modules.batchstatus import BatchStatus
489 from project_modules.jobsuberror import JobsubError
490 from project_modules.ifdherror import IFDHError
491 import larbatch_utilities
492 from larbatch_utilities import convert_str
493 from larbatch_utilities import convert_bytes
494 import samweb_cli
495 
496 samweb = None # Initialized SAMWebClient object
497 extractor_dict = None # Metadata extractor
498 proxy_ok = False
499 
500 # Function to make sure global SAMWebClient object is initialized.
501 # Also imports extractor_dict module.
502 # This function should be called before using samweb.
503 
505 
506  # Get intialized samweb, if not already done.
507 
508  global samweb
509  global extractor_dict
510  global expMetaData
511 
512 
513  if samweb == None:
514  samweb = project_utilities.samweb()
515  from extractor_dict import expMetaData
516 
517 # Multi-project clean function.
518 
519 def docleanx(projects, projectname, stagename, clean_descendants = True):
520  print(projectname, stagename)
521 
522  # Loop over projects and stages.
523  # Clean all stages beginning with the specified project/stage.
524  # For empty project/stage name, clean all stages.
525  #
526  # For safety, only clean directories if the uid of the
527  # directory owner matches the current uid or effective uid.
528  # Do this even if the delete operation is allowed by filesystem
529  # permissions (directories may be group- or public-write
530  # because of batch system).
531 
532  uid = os.getuid()
533  euid = os.geteuid()
534  cleaned_bookdirs = []
535 
536  # Clean iteratively.
537 
538  done_cleaning = False
539  while not done_cleaning:
540 
541  cleaned_something = False
542 
543  # Loop over projects and stages.
544 
545  for project in projects:
546  for stage in project.stages:
547 
548  clean_this_stage = False
549 
550  # Skip this stage if it has already been cleaned.
551 
552  if not stage.bookdir in cleaned_bookdirs:
553 
554  # Determine if this is the first stage we want to clean.
555 
556  if (projectname == '' or project.name == projectname) and \
557  (stagename == '' or stage.name == stagename):
558 
559  clean_this_stage = True
560 
561  # Determine if we want to clean this stage because it uses
562  # an input filelist that lives in an already-cleaned bookdir.
563 
564  elif clean_descendants and stage.inputlist != '' and \
565  os.path.dirname(stage.inputlist) in cleaned_bookdirs:
566 
567  clean_this_stage = True
568 
569  # Do cleaning.
570 
571  if clean_this_stage:
572  cleaned_something = True
573  cleaned_bookdirs.append(stage.bookdir)
574 
575  print('Clean project %s, stage %s' % (project.name, stage.name))
576 
577  # Clean this stage outdir.
578 
579  if larbatch_posix.exists(stage.outdir):
580  dir_uid = larbatch_posix.stat(stage.outdir).st_uid
581  if dir_uid == uid or dir_uid == euid:
582  print('Clean directory %s.' % stage.outdir)
583  larbatch_posix.rmtree(stage.outdir)
584  else:
585  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.outdir)
586 
587  # Clean this stage logdir.
588 
589  if larbatch_posix.exists(stage.logdir):
590  dir_uid = larbatch_posix.stat(stage.logdir).st_uid
591  if dir_uid == uid or dir_uid == euid:
592  print('Clean directory %s.' % stage.logdir)
593  larbatch_posix.rmtree(stage.logdir)
594  else:
595  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.logdir)
596 
597  # Clean this stage workdir.
598 
599  if larbatch_posix.exists(stage.workdir):
600  dir_uid = larbatch_posix.stat(stage.workdir).st_uid
601  if dir_uid == uid or dir_uid == euid:
602  print('Clean directory %s.' % stage.workdir)
603  larbatch_posix.rmtree(stage.workdir)
604  else:
605  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.workdir)
606 
607  # Clean this stage bookdir.
608 
609  if larbatch_posix.exists(stage.bookdir):
610  dir_uid = larbatch_posix.stat(stage.bookdir).st_uid
611  if dir_uid == uid or dir_uid == euid:
612  print('Clean directory %s.' % stage.bookdir)
613  larbatch_posix.rmtree(stage.bookdir)
614  else:
615  raise RuntimeError('Owner mismatch, delete %s manually.' % stage.bookdir)
616 
617  done_cleaning = not cleaned_something
618 
619  # Done.
620 
621  return
622 
623 # Stage status fuction.
624 
625 def dostatus(projects):
626 
627  # BatchStatus constructor requires authentication.
628 
629  project_utilities.test_kca()
630 
631  # For backward compatibility, allow this function to be called with
632  # either a single project or a list of projects.
633 
634  prjs = projects
635  if type(projects) != type([]) and type(projects) != type(()):
636  prjs = [projects]
637 
638  project_status = ProjectStatus(prjs)
639  batch_status = BatchStatus(prjs)
640 
641  for project in prjs:
642 
643  print('\nProject %s:' % project.name)
644 
645  # Loop over stages.
646 
647  for stage in project.stages:
648 
649  stagename = stage.name
650  stage_status = project_status.get_stage_status(stagename)
651  b_stage_status = batch_status.get_stage_status(stagename)
652  if stage_status.exists:
653  print('\nStage %s: %d art files, %d events, %d analysis files, %d errors, %d missing files.' % (
654  stagename, stage_status.nfile, stage_status.nev, stage_status.nana,
655  stage_status.nerror, stage_status.nmiss))
656  else:
657  print('\nStage %s output directory does not exist.' % stagename)
658  print('Stage %s batch jobs: %d idle, %d running, %d held, %d other.' % (
659  stagename, b_stage_status[0], b_stage_status[1], b_stage_status[2], b_stage_status[3]))
660  return
661 
662 
663 # Recursively extract projects from an xml element.
664 
665 def find_projects(element, check=True):
666 
667  projects = []
668 
669  # First check if the input element is a project. In that case, return a
670  # list containing the project name as the single element of the list.
671 
672  if element.nodeName == 'project':
673  default_input_by_stage = {}
674  project = ProjectDef(element, '', default_input_by_stage, check=check)
675  projects.append(project)
676 
677  else:
678 
679  # Input element is not a project.
680  # Loop over subelements.
681 
682  default_input = ''
683  default_input_by_stage = {}
684  subelements = element.getElementsByTagName('project')
685  for subelement in subelements:
686  project = ProjectDef(subelement, default_input, default_input_by_stage, check=check)
687  projects.append(project)
688  for stage in project.stages:
689  stage_list = os.path.join(stage.bookdir, 'files.list')
690  default_input_by_stage[stage.name] = stage_list
691  default_input = stage_list
692 
693  # Done.
694 
695  return projects
696 
697 
698 # Extract all projects from the specified xml file.
699 
700 def get_projects(xmlfile, check=True):
701 
702  # Cache results.
703 
704  if xmlfile in get_projects.cache:
705  return get_projects.cache[xmlfile]
706 
707  # Parse xml (returns xml document).
708 
709  if xmlfile == '-':
710  xml = sys.stdin
711  elif xmlfile.find(':') < 0:
712  xml = open(xmlfile)
713  else:
714  xml = urlrequest.urlopen(xmlfile)
715  doc = parse(xml)
716 
717  # Extract root element.
718 
719  root = doc.documentElement
720 
721  # Find project names in the root element.
722 
723  projects = find_projects(root, check=check)
724 
725  # Cache result.
726 
727  get_projects.cache[xmlfile] = projects
728 
729  # Done.
730 
731  return projects
732 
733 # Get_projects result cache.
734 
735 get_projects.cache = {}
736 
737 
738 # Select the specified project.
739 
740 def select_project(projects, projectname, stagename):
741 
742  for project in projects:
743  if projectname == '' or projectname == project.name:
744  for stage in project.stages:
745  if stagename == '' or stagename == stage.name:
746  return project
747 
748  # Failure if we fall out of the loop.
749 
750  return None
751 
752 
753 # Extract the specified project element from xml file.
754 
755 def get_project(xmlfile, projectname='', stagename='', check=True):
756  projects = get_projects(xmlfile, check=check)
757  project = select_project(projects, projectname, stagename)
758  return project
759 
760 # Extract the next sequential stage
761 
762 def next_stage(projects, stagename, circular=False):
763 
764  # Loop over projects.
765 
766  found = False
767  for project in projects:
768 
769  # Loop over stages.
770 
771  for stage in project.stages:
772  if found:
773  return stage
774  if stage.name == stagename:
775  found = True
776 
777  # Circular mode: Choose first stage if we fell out of the loop.
778 
779  if circular and len(projects) > 0 and len(projects[0].stages) > 0:
780  return projects[0].stages[0]
781 
782  # Finally return None if we didn't find anything appropriate.
783 
784  return None
785 
786 # Extract the previous sequential stage.
787 
788 def previous_stage(projects, stagename, circular=False):
789 
790  # Initialize result None or last stage (if circular).
791 
792  result = None
793  if circular and len(projects) > 0 and len(projects[-1].stages) > 0:
794  result = projects[-1].stages[-1]
795 
796  # Loop over projects.
797 
798  for project in projects:
799 
800  # Loop over stages.
801 
802  for stage in project.stages:
803  if stage.name == stagename:
804  return result
805  result = stage
806 
807  # Return default answer if we fell out of the loop.
808 
809  return result
810 
811 # Extract pubsified stage from xml file.
812 # Return value is a 2-tuple (project, stage).
813 
814 def get_pubs_stage(xmlfile, projectname, stagename, run, subruns, version=None):
815  projects = get_projects(xmlfile)
816  project = select_project(projects, projectname, stagename)
817  if project == None:
818  raise RuntimeError('No project selected for projectname=%s, stagename=%s' % (
819  projectname, stagename))
820  stage = project.get_stage(stagename)
821  if stage == None:
822  raise RuntimeError('No stage selected for projectname=%s, stagename=%s' % (
823  projectname, stagename))
824  get_projects.cache = {}
825  stage.pubsify_input(run, subruns, version)
826  stage.pubsify_output(run, subruns, version)
827  get_projects.cache = {}
828  return project, stage
829 
830 
831 # Check a single root file.
832 # Returns a 2-tuple containing the number of events and stream name.
833 # The number of events conveys the following information:
834 # 1. Number of events (>=0) in TTree named "Events."
835 # 2. -1 if root file does not contain an Events TTree, but is otherwise valid (openable).
836 # 3. -2 for error (root file does not exist or is not openable).
837 
838 def check_root_file(path, logdir):
839 
840  global proxy_ok
841  result = (-2, '')
842  json_ok = False
843  md = []
844 
845  # First check if root file exists (error if not).
846 
847  if not larbatch_posix.exists(path):
848  return result
849 
850  # See if we have precalculated metadata for this root file.
851 
852  json_path = os.path.join(logdir, os.path.basename(path) + '.json')
853  if larbatch_posix.exists(json_path):
854 
855  # Get number of events from precalculated metadata.
856 
857  try:
858  lines = larbatch_posix.readlines(json_path)
859  s = ''
860  for line in lines:
861  s = s + line
862 
863  # Convert json string to python dictionary.
864 
865  md = json.loads(s)
866 
867  # If we get this far, say the file was at least openable.
868 
869  result = (-1, '')
870 
871  # Extract number of events and stream name from metadata.
872 
873  if len(list(md.keys())) > 0:
874  nevroot = -1
875  stream = ''
876  if 'events' in md:
877  nevroot = int(md['events'])
878  if 'data_stream' in md:
879  stream = md['data_stream']
880  result = (nevroot, stream)
881  json_ok = True
882  except:
883  result = (-2, '')
884  return result
885 
886 
887 # Check data files in the specified directory.
888 
889 def check_root(outdir, logdir, data_file_types):
890 
891  # This method looks for files with file types matching data_file_types.
892  # If such files are found, it also checks for the existence of
893  # an Events TTree.
894  #
895  # Returns a 3-tuple containing the following information.
896  # 1. Total number of events in art root files.
897  # 2. A list of 3-tuples with an entry for each art root file.
898  # The 3-tuple contains the following information.
899  # a) Filename (full path).
900  # b) Number of events
901  # c) Stream name.
902  # 3. A list of histogram root files.
903 
904  nev = -1
905  roots = []
906  hists = []
907 
908  print('Checking root files in directory %s.' % outdir)
909  filenames = larbatch_posix.listdir(outdir)
910  for filename in filenames:
911  name, ext = os.path.splitext(filename)
912  if len(ext) > 0 and ext[1:] in data_file_types:
913  path = os.path.join(outdir, filename)
914  nevroot, stream = check_root_file(path, logdir)
915  if nevroot >= 0:
916  if nev < 0:
917  nev = 0
918  nev = nev + nevroot
919  roots.append((os.path.join(outdir, filename), nevroot, stream))
920 
921  elif nevroot == -1:
922 
923  # Valid data file, not an art root file.
924 
925  hists.append(os.path.join(outdir, filename))
926 
927  else:
928 
929  # Found a .root file that is not openable.
930  # Print a warning, but don't trigger any other error.
931 
932  print('Warning: File %s in directory %s is not a valid root file.' % (filename, outdir))
933 
934  # Done.
935 
936  return (nev, roots, hists)
937 
938 
939 # Get the list of input files for a project stage.
940 
941 def get_input_files(stage):
942 
943  # In case of single file or file list input, files are returned exactly
944  # as specified, which would normallly be as the full path.
945  # In case of sam input, only the file names are returned (guaranteed unique).
946 
947  result = []
948  if stage.inputfile != '':
949  result.append(stage.inputfile)
950 
951  elif stage.inputlist != '' and larbatch_posix.exists(stage.inputlist):
952  try:
953  input_filenames = larbatch_posix.readlines(stage.inputlist)
954  for line in input_filenames:
955  words = line.split()
956  result.append(words[0])
957  except:
958  pass
959 
960  elif stage.inputdef != '':
961  import_samweb()
962  result = samweb.listFiles(defname=stage.inputdef)
963 
964  # Done.
965 
966  return result
967 
968 # Shorten root file names to have fewer than 200 characters.
969 
970 def doshorten(stage):
971 
972  # Untar log files.
973 
974  untarlog(stage)
975 
976  # Loop over .root files in outdir.
977 
978  for out_subpath, subdirs, files in larbatch_posix.walk(stage.outdir):
979 
980  # Only examine files in leaf directories.
981 
982  if len(subdirs) != 0:
983  continue
984 
985  subdir = os.path.relpath(out_subpath, stage.outdir)
986  log_subpath = os.path.join(stage.bookdir, subdir)
987 
988  for file in files:
989  if file[-5:] == '.root':
990  if len(file) >= 200:
991 
992  # Long filenames renamed here.
993 
994  file_path = os.path.join(out_subpath, file)
995  shortfile = file[:150] + str(uuid.uuid4()) + '.root'
996  shortfile_path = os.path.join(out_subpath, shortfile)
997  print('%s\n->%s\n' % (file_path, shortfile_path))
998  larbatch_posix.rename(file_path, shortfile_path)
999 
1000  # Also rename corresponding json file, if it exists.
1001 
1002  json_path = os.path.join(log_subpath, file + '.json')
1003  if larbatch_posix.exists(json_path):
1004  shortjson = shortfile + '.json'
1005  shortjson_path = os.path.join(log_subpath, shortjson)
1006  print('%s\n->%s\n' % (json_path, shortjson_path))
1007  larbatch_posix.rename(json_path, shortjson_path)
1008 
1009  return
1010 
1011 # Untar tarred up log files in logtir into bookdir.
1012 
1013 def untarlog(stage):
1014 
1015  # Walk over logdir to look for log files.
1016 
1017  for log_subpath, subdirs, files in larbatch_posix.walk(stage.logdir):
1018 
1019  # Only examine leaf directories.
1020 
1021  if len(subdirs) != 0:
1022  continue
1023  subdir = os.path.relpath(log_subpath, stage.logdir)
1024  if subdir == '.':
1025  continue
1026  book_subpath = os.path.join(stage.bookdir, subdir)
1027  for file in files:
1028  if file.startswith('log') and file.endswith('.tar'):
1029  src = '%s/%s' % (log_subpath, file)
1030  dst = '%s/%s' % (book_subpath, file)
1031  flag = '%s.done' % dst
1032 
1033  # Decide if we need to copy this tarball to bookdir.
1034 
1035  if dst != src and not larbatch_posix.exists(flag):
1036 
1037  # Copy tarball to bookdir.
1038 
1039  print('Copying tarball %s into %s' % (src, book_subpath))
1040  if not larbatch_posix.isdir(book_subpath):
1041  larbatch_posix.makedirs(book_subpath)
1042  larbatch_posix.copy(src, dst)
1043 
1044  # Decide if we need to extract this tarball into bookdir.
1045 
1046  if not larbatch_posix.exists(flag):
1047 
1048  # Extract tarball.
1049 
1050  print('Extracting tarball %s' % dst)
1051  jobinfo = subprocess.Popen(['tar','-xf', dst, '-C', book_subpath,
1052  '--exclude=beam*.dat',
1053  '--exclude=beam*.info',
1054  '--exclude=core*',
1055  '--exclude=*.db',
1056  '--exclude=*.sh',
1057  '--exclude=*.py*',
1058  '--exclude=*.tar'],
1059  stdout=subprocess.PIPE,
1060  stderr=subprocess.PIPE)
1061  jobout, joberr = jobinfo.communicate()
1062  jobout = convert_str(jobout)
1063  joberr = convert_str(joberr)
1064  rc = jobinfo.poll()
1065  if rc != 0:
1066  print(jobout)
1067  print(joberr)
1068  print('Failed to extract log tarball in %s' % dst)
1069 
1070  else:
1071 
1072  # Create flag file.
1073 
1074  f = larbatch_posix.open(flag, 'w')
1075  f.write('\n') # Don't want zero size file.
1076  f.close()
1077 
1078  # Delete copy of tarball.
1079 
1080  if dst != src:
1081  larbatch_posix.remove(dst)
1082 
1083  return
1084 
1085 # Check project results in the specified directory.
1086 
1087 def docheck(project, stage, ana, quick=False):
1088 
1089  # This method performs various checks on worker subdirectories, named
1090  # as <cluster>_<process>, where <cluster> and <process> are integers.
1091  # In contrast, sam start and stop project jobs are named as
1092  # <cluster>_start and <cluster>_stop.
1093  #
1094  # Return 0 if all checks are OK, meaning:
1095  # a) No errors detected for any process.
1096  # b) At least one good root file (if not ana).
1097  # Otherwise return nonzero.
1098  #
1099  # The following checks are performed.
1100  #
1101  # 1. Make sure subdirectory names are as expected.
1102  #
1103  # 2. Look for at least one art root file in each worker subdirectory
1104  # containing a valid Events TTree. Complain about any
1105  # that do not contain such a root file.
1106  #
1107  # 3. Check that the number of events in the Events tree are as expected.
1108  #
1109  # 4. Complain about any duplicated art root file names (if sam metadata is defined).
1110  #
1111  # 5. Check job exit status (saved in lar.stat).
1112  #
1113  # 6. For sam input, make sure that files sam_project.txt and cpid.txt are present.
1114  #
1115  # 7. Check that any non-art root files are openable.
1116  #
1117  # 8. Make sure file names do not exceed 200 characters (if sam metadata is defined).
1118  #
1119  # In analysis mode (if argumment ana != 0), skip checks 2-4, but still do
1120  # checks 1 and 5-7.
1121  #
1122  # This function also creates the following files in the specified directory.
1123  #
1124  # 1. files.list - List of good root files.
1125  # 2. events.list - List of good root files and number of events in each file.
1126  # 3. bad.list - List of worker subdirectories with problems.
1127  # 4. missing_files.list - List of unprocessed input files.
1128  # 5. sam_projects.list - List of successful sam projects.
1129  # 6. cpids.list - list of successful consumer process ids.
1130  # 7. filesana.list - List of non-art root files (histograms and/or ntuples).
1131  #
1132  # For projects with no input (i.e. generator jobs), if there are fewer than
1133  # the requisite number of good generator jobs, a "missing_files.list" will be
1134  # generated with lines containing /dev/null.
1135 
1136  # Untar log files into bookdir.
1137 
1138  untarlog(stage)
1139 
1140  # Quick check?
1141 
1142  if quick == 1 and not ana:
1143  return doquickcheck(project, stage, ana)
1144 
1145  stage.checkinput()
1146 
1147  # Check that output and log directories exist.
1148 
1149  if not larbatch_posix.exists(stage.outdir):
1150  print('Output directory %s does not exist.' % stage.outdir)
1151  return 1
1152  if not larbatch_posix.exists(stage.bookdir):
1153  print('Log directory %s does not exist.' % stage.bookdir)
1154  return 1
1155 
1156  import_samweb()
1157  has_metadata = project.file_type != '' or project.run_type != ''
1158  has_input = stage.inputfile != '' or stage.inputlist != '' or stage.inputdef != ''
1159  print('Checking directory %s' % stage.bookdir)
1160 
1161  # Count total number of events and root files.
1162 
1163  nev_tot = 0
1164  nroot_tot = 0
1165 
1166  # Loop over subdirectories (ignore files and directories named *_start and *_stop).
1167 
1168  procmap = {} # procmap[subdir] = <list of art root files and event counts>
1169  processes = [] # Integer process numbers derived from subdirectory names.
1170  filesana = [] # List of non-art root files.
1171  sam_projects = [] # List of sam projects.
1172  cpids = [] # List of successful sam consumer process ids.
1173  uris = [] # List of input files processed successfully.
1174  bad_workers = [] # List of bad worker subdirectories.
1175 
1176 
1177  for log_subpath, subdirs, files in larbatch_posix.walk(stage.bookdir):
1178 
1179  # Only examine files in leaf directories.
1180 
1181  if len(subdirs) != 0:
1182  continue
1183 
1184  subdir = os.path.relpath(log_subpath, stage.bookdir)
1185  if subdir == '.':
1186  continue
1187  out_subpath = os.path.join(stage.outdir, subdir)
1188  dirok = project_utilities.fast_isdir(log_subpath)
1189 
1190  # Update list of sam projects from start job.
1191 
1192  if dirok and log_subpath[-6:] == '_start':
1193  filename = os.path.join(log_subpath, 'sam_project.txt')
1194  if larbatch_posix.exists(filename):
1195  sam_project = larbatch_posix.readlines(filename)[0].strip()
1196  if sam_project != '' and not sam_project in sam_projects:
1197  sam_projects.append(sam_project)
1198 
1199  # Regular worker jobs checked here.
1200 
1201  if dirok and not subdir[-6:] == '_start' and not subdir[-5:] == '_stop' \
1202  and not subdir == 'log':
1203 
1204  bad = 0
1205 
1206  # Make sure that corresponding output directory exists.
1207 
1208  if not project_utilities.fast_isdir(out_subpath):
1209  print('No output directory corresponding to subdirectory %s.' % subdir)
1210  bad = 1
1211 
1212  # Check lar exit status (if any).
1213 
1214  if not bad:
1215  stat_filename = os.path.join(log_subpath, 'lar.stat')
1216  if larbatch_posix.exists(stat_filename):
1217  status = 0
1218  try:
1219  status = int(larbatch_posix.readlines(stat_filename)[0].strip())
1220  if status != 0:
1221  print('Job in subdirectory %s ended with non-zero exit status %d.' % (
1222  subdir, status))
1223  bad = 1
1224  except:
1225  print('Bad file lar.stat in subdirectory %s.' % subdir)
1226  bad = 1
1227 
1228  # Now check root files in this subdirectory.
1229 
1230  if not bad:
1231  nev = 0
1232  roots = []
1233  nev, roots, subhists = check_root(out_subpath, log_subpath, stage.datafiletypes)
1234  if not ana:
1235  if len(roots) == 0 or nev < 0:
1236  print('Problem with root file(s) in subdirectory %s.' % subdir)
1237  bad = 1
1238  elif nev < -1 or len(subhists) == 0:
1239  print('Problem with analysis root file(s) in subdirectory %s.' % subdir)
1240  bad = 1
1241 
1242 
1243  # Check for duplicate filenames (only if metadata is being generated).
1244 
1245  if not bad and has_metadata:
1246  for root in roots:
1247  rootname = os.path.basename(root[0])
1248  for s in list(procmap.keys()):
1249  oldroots = procmap[s]
1250  for oldroot in oldroots:
1251  oldrootname = os.path.basename(oldroot[0])
1252  if rootname == oldrootname:
1253  print('Duplicate filename %s in subdirectory %s' % (rootname,
1254  subdir))
1255  olddir = os.path.basename(os.path.dirname(oldroot[0]))
1256  print('Previous subdirectory %s' % olddir)
1257  bad = 1
1258 
1259  # Make sure root file names do not exceed 200 characters.
1260 
1261  if not bad and has_metadata:
1262  for root in roots:
1263  rootname = os.path.basename(root[0])
1264  if len(rootname) >= 200:
1265  print('Filename %s in subdirectory %s is longer than 200 characters.' % (
1266  rootname, subdir))
1267  bad = 1
1268 
1269  # Check existence of sam_project.txt and cpid.txt.
1270  # Update sam_projects and cpids.
1271 
1272  if not bad and stage.inputdef != '':
1273  filename1 = os.path.join(log_subpath, 'sam_project.txt')
1274  if not larbatch_posix.exists(filename1):
1275  print('Could not find file sam_project.txt')
1276  bad = 1
1277  filename2 = os.path.join(log_subpath, 'cpid.txt')
1278  if not larbatch_posix.exists(filename2):
1279  print('Could not find file cpid.txt')
1280  bad = 1
1281  if not bad:
1282  sam_project = larbatch_posix.readlines(filename1)[0].strip()
1283  if not sam_project in sam_projects:
1284  sam_projects.append(sam_project)
1285  cpid = larbatch_posix.readlines(filename2)[0].strip()
1286  if not cpid in cpids:
1287  cpids.append(cpid)
1288 
1289  # Check existence of transferred_uris.list.
1290  # Update list of uris.
1291 
1292  if not bad and (stage.inputlist !='' or stage.inputfile != ''):
1293  filename = os.path.join(log_subpath, 'transferred_uris.list')
1294  if not larbatch_posix.exists(filename):
1295  print('Could not find file transferred_uris.list')
1296  bad = 1
1297  if not bad:
1298  lines = larbatch_posix.readlines(filename)
1299  for line in lines:
1300  uri = line.strip()
1301  if uri != '':
1302  uris.append(uri)
1303 
1304  # Save process number, and check for duplicate process numbers
1305  # (only if no input).
1306 
1307  if not has_input:
1308  subdir_split = subdir.split('_')
1309  if len(subdir_split) > 1:
1310  process = int(subdir_split[1])
1311  if process in processes:
1312  print('Duplicate process number')
1313  bad = 1
1314  else:
1315  processes.append(process)
1316 
1317  # Save information about good root files.
1318 
1319  if not bad:
1320  procmap[subdir] = roots
1321 
1322  # Save good histogram files.
1323 
1324  filesana.extend(subhists)
1325 
1326  # Count good events and root files.
1327 
1328  nev_tot = nev_tot + nev
1329  nroot_tot = nroot_tot + len(roots)
1330 
1331  # Update list of bad workers.
1332 
1333  if bad:
1334  bad_workers.append(subdir)
1335 
1336  # Print/save result of checks for one subdirectory.
1337 
1338  if bad:
1339  print('Bad subdirectory %s.' % subdir)
1340 
1341  # Done looping over subdirectoryes.
1342  # Dictionary procmap now contains a list of good processes
1343  # and root files.
1344 
1345  # Before attempting to create bookkeeping files in stage.bookdir, check
1346  # whether this directory is readable. If not readable, return error
1347  # status without creating any bookkeeping files. This is to prevent
1348  # hangs.
1349 
1350  contents = larbatch_posix.listdir(stage.bookdir)
1351  if len(contents) == 0:
1352  print('Directory %s may be dead.' % stage.bookdir)
1353  print('Returning error status without creating any bookkeeping files.')
1354  return 1
1355 
1356  # Open files.
1357 
1358  filelistname = os.path.join(stage.bookdir, 'files.list')
1359  filelist = safeopen(filelistname)
1360 
1361  eventslistname = os.path.join(stage.bookdir, 'events.list')
1362  eventslist = safeopen(eventslistname)
1363 
1364  badfilename = os.path.join(stage.bookdir, 'bad.list')
1365  badfile = safeopen(badfilename)
1366 
1367  missingfilesname = os.path.join(stage.bookdir, 'missing_files.list')
1368  missingfiles = safeopen(missingfilesname)
1369 
1370  filesanalistname = os.path.join(stage.bookdir, 'filesana.list')
1371  filesanalist = safeopen(filesanalistname)
1372 
1373  urislistname = os.path.join(stage.bookdir, 'transferred_uris.list')
1374  urislist = safeopen(urislistname)
1375 
1376  # Generate "files.list" and "events.list."
1377  # Also fill stream-specific file list.
1378 
1379  nproc = 0
1380  streams = {} # {stream: file}
1381  nfile = 0
1382  for s in list(procmap.keys()):
1383  nproc = nproc + 1
1384  for root in procmap[s]:
1385  nfile = nfile + 1
1386  filelist.write('%s\n' % root[0])
1387  eventslist.write('%s %d\n' % root[:2])
1388  stream = root[2]
1389  if stream != '':
1390  if stream not in streams:
1391  streamlistname = os.path.join(stage.bookdir, 'files_%s.list' % stream)
1392  streams[stream] = safeopen(streamlistname)
1393  streams[stream].write('%s\n' % root[0])
1394 
1395  # Generate "bad.list"
1396 
1397  nerror = 0
1398  for bad_worker in bad_workers:
1399  badfile.write('%s\n' % bad_worker)
1400  nerror = nerror + 1
1401 
1402  # Generate "missing_files.list."
1403 
1404  nmiss = 0
1405  if stage.inputdef == '' and not stage.pubs_output:
1406  input_files = get_input_files(stage)
1407  if len(input_files) > 0:
1408  missing_files = list(set(input_files) - set(uris))
1409  for missing_file in missing_files:
1410  missingfiles.write('%s\n' % missing_file)
1411  nmiss = nmiss + 1
1412  else:
1413  nmiss = stage.num_jobs - len(procmap)
1414  for n in range(nmiss):
1415  missingfiles.write('/dev/null\n')
1416 
1417 
1418  # Generate "filesana.list."
1419 
1420  for hist in filesana:
1421  filesanalist.write('%s\n' % hist)
1422 
1423  # Generate "transferred_uris.list."
1424 
1425  for uri in uris:
1426  urislist.write('%s\n' % uri)
1427 
1428  # Print summary.
1429 
1430  if ana:
1431  print("%d processes completed successfully." % nproc)
1432  print("%d total good histogram files." % len(filesana))
1433  else:
1434  print("%d total good events." % nev_tot)
1435  print("%d total good root files." % nroot_tot)
1436  print("%d total good histogram files." % len(filesana))
1437 
1438  # Close files.
1439 
1440  filelist.close()
1441  if nfile == 0:
1442  project_utilities.addLayerTwo(filelistname)
1443  eventslist.close()
1444  if nfile == 0:
1445  project_utilities.addLayerTwo(eventslistname)
1446  if nerror == 0:
1447  badfile.write('\n')
1448  badfile.close()
1449  if nmiss == 0:
1450  missingfiles.write('\n')
1451  missingfiles.close()
1452  filesanalist.close()
1453  if len(filesana) == 0:
1454  project_utilities.addLayerTwo(filesanalistname)
1455  if len(uris) == 0:
1456  urislist.write('\n')
1457  urislist.close()
1458  for stream in list(streams.keys()):
1459  streams[stream].close()
1460 
1461  # Make sam files.
1462 
1463  if stage.inputdef != '' and not stage.pubs_input:
1464 
1465  # List of successful sam projects.
1466 
1467  sam_projects_filename = os.path.join(stage.bookdir, 'sam_projects.list')
1468  sam_projects_file = safeopen(sam_projects_filename)
1469  for sam_project in sam_projects:
1470  sam_projects_file.write('%s\n' % sam_project)
1471  sam_projects_file.close()
1472  if len(sam_projects) == 0:
1473  project_utilities.addLayerTwo(sam_projects_filename)
1474 
1475  # List of successfull consumer process ids.
1476 
1477  cpids_filename = os.path.join(stage.bookdir, 'cpids.list')
1478  cpids_file = safeopen(cpids_filename)
1479  for cpid in cpids:
1480  cpids_file.write('%s\n' % cpid)
1481  cpids_file.close()
1482  if len(cpids) == 0:
1483  project_utilities.addLayerTwo(cpids_filename)
1484 
1485  # Get number of consumed files.
1486 
1487  cpids_list = ''
1488  sep = ''
1489  for cpid in cpids:
1490  cpids_list = cpids_list + '%s%s' % (sep, cpid)
1491  sep = ','
1492  if cpids_list != '':
1493  dim = 'consumer_process_id %s and consumed_status consumed' % cpids_list
1494  import_samweb()
1495  nconsumed = samweb.countFiles(dimensions=dim)
1496  else:
1497  nconsumed = 0
1498 
1499  # Get number of unconsumed files.
1500 
1501  if cpids_list != '':
1502  udim = '(defname: %s) minus (%s)' % (stage.inputdef, dim)
1503  else:
1504  udim = 'defname: %s' % stage.inputdef
1505  nunconsumed = samweb.countFiles(dimensions=udim)
1506  nerror = nerror + nunconsumed
1507 
1508  # Sam summary.
1509 
1510  print('%d sam projects.' % len(sam_projects))
1511  print('%d successful consumer process ids.' % len(cpids))
1512  print('%d files consumed.' % nconsumed)
1513  print('%d files not consumed.' % nunconsumed)
1514 
1515  # Check project statuses.
1516 
1517  for sam_project in sam_projects:
1518  print('\nChecking sam project %s' % sam_project)
1519  import_samweb()
1520  url = samweb.findProject(sam_project, project_utilities.get_experiment())
1521  if url != '':
1522  result = samweb.projectSummary(url)
1523  nd = 0
1524  nc = 0
1525  nf = 0
1526  nproc = 0
1527  nact = 0
1528  if 'processes' in result:
1529  processes = result['processes']
1530  for process in processes:
1531  nproc = nproc + 1
1532  if 'status' in process:
1533  if process['status'] == 'active':
1534  nact = nact + 1
1535  if 'counts' in process:
1536  counts = process['counts']
1537  if 'delivered' in counts:
1538  nd = nd + counts['delivered']
1539  if 'consumed' in counts:
1540  nc = nc + counts['consumed']
1541  if 'failed' in counts:
1542  nf = nf + counts['failed']
1543  print('Status: %s' % result['project_status'])
1544  print('%d total processes' % nproc)
1545  print('%d active processes' % nact)
1546  print('%d files in snapshot' % result['files_in_snapshot'])
1547  print('%d files delivered' % (nd + nc))
1548  print('%d files consumed' % nc)
1549  print('%d files failed' % nf)
1550  print()
1551 
1552  # Done
1553 
1554  checkfilename = os.path.join(stage.bookdir, 'checked')
1555  checkfile = safeopen(checkfilename)
1556  checkfile.write('\n')
1557  checkfile.close()
1558  project_utilities.addLayerTwo(checkfilename)
1559 
1560  if stage.inputdef == '' or stage.pubs_input:
1561  print('%d processes with errors.' % nerror)
1562  print('%d missing files.' % nmiss)
1563  else:
1564  print('%d unconsumed files.' % nerror)
1565 
1566  # Return error status if any error or not good root file produced.
1567  # Also return error if no successful processes were detected
1568 
1569  result = 0
1570  if nerror != 0:
1571  result = 1
1572  if not ana and nroot_tot == 0:
1573  result = 1
1574  if len(procmap) == 0:
1575  result = 1
1576  return result
1577 
1578 def doquickcheck(project, stage, ana):
1579 
1580  # Check that output and log directories exist. Dirs could be lost due to ifdhcp failures
1581  if not larbatch_posix.isdir(stage.outdir):
1582  print('Output directory %s does not exist.' % stage.outdir)
1583  return 1
1584 
1585  if not larbatch_posix.isdir(stage.bookdir):
1586  print('Log directory %s does not exist.' % stage.bookdir)
1587  return 1
1588 
1589  print('Checking directory %s' % stage.bookdir)
1590 
1591  #Aggregate the .list files form the bookdir up one dir. This is where the old docheck would put them, and it double-checks that the files made it back from the worker node.
1592 
1593  goodFiles = [] # list of art root files
1594  goodAnaFiles = [] # list of analysis root files
1595  eventLists = [] # list of art root files and number of events
1596  badLists = [] # list of bad root files
1597  anaFiles = [] # list of ana files
1598  transferredFiles = [] # list of transferred files
1599  streamLists = {} # dictionary which keeps track of files per stream
1600 
1601  sam_projects = [] # list of sam projects
1602  cpids = [] # list of consumer process ids
1603 
1604  goodLogDirs = set() # Set of log directories.
1605  nErrors = 0 # Number of erors uncovered
1606 
1607  for log_subpath, subdirs, files in larbatch_posix.walk(stage.bookdir):
1608 
1609  # Only examine files in leaf directories.
1610 
1611  if len(subdirs) != 0:
1612  continue
1613 
1614  #skip start and stop project jobs for now
1615  if log_subpath[-6:] == '_start' or log_subpath[-5:] == '_stop':
1616  filename = os.path.join(log_subpath, 'sam_project.txt')
1617  if larbatch_posix.exists(filename):
1618  sam_project = larbatch_posix.readlines(filename)[0].strip()
1619  if sam_project != '' and not sam_project in sam_projects:
1620  sam_projects.append(sam_project)
1621  continue
1622 
1623 
1624  print('Doing quick check of directory %s.' % log_subpath)
1625 
1626  subdir = os.path.relpath(log_subpath, stage.bookdir)
1627 
1628  out_subpath = os.path.join(stage.outdir, subdir)
1629  dirok = project_utilities.fast_isdir(log_subpath)
1630 
1631  #first check the missing_file.list
1632 
1633 
1634  validateOK = 1
1635 
1636  missingfilesname = os.path.join(log_subpath, 'missing_files.list')
1637 
1638  #print missingfilesname
1639 
1640  try:
1641  #print 'Reading %s' % missingfilesname
1642  missingfiles = project_utilities.saferead(missingfilesname)
1643  #if we can't find missing_files the check will not work
1644  except:
1645  print('Cannot open file: %s' % missingfilesname)
1646  validateOK = 0
1647 
1648 
1649  if validateOK == 1 and len(missingfiles) == 0:
1650  print('%s exists, but is empty' % missingfilesname)
1651  validateOK = 0
1652 
1653 
1654  if validateOK == 1:
1655  line = missingfiles[0]
1656  line = line.strip('\n')
1657  if( int(line) != 0 ):
1658  validateOK = 0
1659 
1660 
1661  #If the validation failed, continue.
1662  if validateOK != 1:
1663  nErrors += 1
1664  continue
1665 
1666  #Copy files.
1667  #print 'Appending Files'
1668 
1669  # Check existence of sam_project.txt and cpid.txt.
1670  # Update sam_projects and cpids.
1671 
1672  if stage.inputdef != '':
1673 
1674  filename1 = os.path.join(log_subpath, 'sam_project.txt')
1675  if not larbatch_posix.exists(filename1):
1676  print('Could not find file sam_project.txt')
1677  nErrors += 1
1678  else:
1679  sam_project = larbatch_posix.readlines(filename1)[0].strip()
1680  if not sam_project in sam_projects:
1681  sam_projects.append(sam_project)
1682 
1683  filename2 = os.path.join(log_subpath, 'cpid.txt')
1684  if not larbatch_posix.exists(filename2):
1685  print('Could not find file cpid.txt')
1686  nErrors += 1
1687  else:
1688  cpid = larbatch_posix.readlines(filename2)[0].strip()
1689  if not cpid in cpids:
1690  cpids.append(cpid)
1691 
1692  filelistsrc = os.path.join(log_subpath, 'files.list')
1693  tmpArray = scan_file(filelistsrc)
1694 
1695  if( tmpArray == [ -1 ] ):
1696  nErrors += 1
1697  else:
1698  goodFiles.extend(tmpArray)
1699 
1700  fileanalistsrc = os.path.join(log_subpath, 'filesana.list')
1701  tmpArray = scan_file(fileanalistsrc)
1702 
1703  if( not tmpArray == [ -1 ] ):
1704  goodAnaFiles.extend(tmpArray)
1705 
1706  eventlistsrc = os.path.join(log_subpath, 'events.list')
1707 
1708  tmpArray = scan_file(eventlistsrc)
1709 
1710  if( tmpArray == [ -1 ] ):
1711  nErrors += 1
1712  else:
1713  eventLists.extend(tmpArray)
1714 
1715 
1716  badfilesrc = os.path.join(log_subpath, 'bad.list')
1717 
1718 
1719  tmpArray = scan_file(badfilesrc)
1720 
1721  #bad list begin empty is okay
1722  if( tmpArray == [ -1 ] ):
1723  pass
1724  else:
1725  badLists.extend(tmpArray)
1726 
1727  '''
1728  missingfilesrc = os.path.join(log_subpath, 'missing_files.list')
1729 
1730  tmpArray = scan_file(missingfilesrc)
1731 
1732  if( tmpArray == [ -1 ] ):
1733  nErrors += 1
1734  else:
1735  missingLists.extend(tmpArray)
1736  '''
1737 
1738  #if ana:
1739  # filesanalistsrc = os.path.join(log_subpath, 'filesana.list')
1740 
1741  # tmpArray = scan_file(filesanalistsrc)
1742 
1743  # if( tmpArray == [ -1 ] ):
1744  # nErrors += 1
1745  # else:
1746  # anaFiles.extend(tmpArray)
1747 
1748  urislistsrc = os.path.join(log_subpath, 'transferred_uris.list')
1749 
1750  tmpArray = scan_file(urislistsrc)
1751 
1752  #empty uri file is not nessecary an error
1753  if( tmpArray == [ -1 ] ):
1754  pass
1755  else:
1756  transferredFiles.extend(tmpArray)
1757  #create a list of files_*.list files. These are outputs from specific streams
1758  streamList = larbatch_posix.listdir(log_subpath)
1759 
1760  for stream in streamList:
1761  if( stream[:6] != "files_" ):
1762  continue
1763  streamfilesrc = os.path.join(log_subpath, stream)
1764  #print stream
1765  tmpArray = scan_file(streamfilesrc)
1766  if( tmpArray == [ -1 ] ):
1767  nErrors += 1
1768  else:
1769  if(streamLists.get(stream, "empty") == "empty" ):
1770  streamLists[stream] = tmpArray
1771  else:
1772  streamLists[stream].extend(tmpArray)
1773 
1774  if validateOK == 1:
1775  goodLogDirs.add(log_subpath)
1776 
1777  checkfilename = os.path.join(stage.bookdir, 'checked')
1778  checkfile = safeopen(checkfilename)
1779  checkfile.write('\n')
1780  checkfile.close()
1781 
1782  #create the input files.list for the next stage
1783  filelistdest = os.path.join(stage.bookdir, 'files.list')
1784  if larbatch_posix.exists(filelistdest):
1785  #print 'Deleting %s' % filelistdest
1786  larbatch_posix.remove(filelistdest)
1787  if len(goodLogDirs) == 1:
1788  src = '%s/files.list' % goodLogDirs.copy().pop()
1789  #print 'Symlinking %s to %s' % (src, filelistdest)
1790  larbatch_posix.symlink(src, filelistdest)
1791  else:
1792  #print 'Aggregating files.list'
1793  inputList = safeopen(filelistdest)
1794  for goodFile in goodFiles:
1795  #print goodFile
1796  inputList.write("%s\n" % goodFile)
1797  inputList.close()
1798  if len(goodFiles) == 0:
1799  project_utilities.addLayerTwo(filelistdest)
1800 
1801  #create the aggregated filesana.list
1802  fileanalistdest = os.path.join(stage.bookdir, 'filesana.list')
1803  if larbatch_posix.exists(fileanalistdest):
1804  #print 'Deleting %s' % fileanalistdest
1805  larbatch_posix.remove(fileanalistdest)
1806  if len(goodLogDirs) == 1:
1807  src = '%s/filesana.list' % goodLogDirs.copy().pop()
1808  #print 'Symlinking %s to %s' % (src, fileanalistdest)
1809  larbatch_posix.symlink(src, fileanalistdest)
1810  else:
1811  #print 'Aggregating filesana.list'
1812  anaList = safeopen(fileanalistdest)
1813  for goodAnaFile in goodAnaFiles:
1814  #print goodAnaFile
1815  anaList.write("%s\n" % goodAnaFile)
1816  anaList.close()
1817  if len(goodAnaFiles) == 0:
1818  project_utilities.addLayerTwo(fileanalistdest)
1819 
1820  #create the events.list for the next step
1821  eventlistdest = os.path.join(stage.bookdir, 'events.list')
1822  if larbatch_posix.exists(eventlistdest):
1823  #print 'Deleting %s' % eventlistdest
1824  larbatch_posix.remove(eventlistdest)
1825  if len(goodLogDirs) == 1:
1826  src = '%s/events.list' % goodLogDirs.copy().pop()
1827  #print 'Symlinking %s to %s' % (src, eventlistdest)
1828  larbatch_posix.symlink(src, eventlistdest)
1829  else:
1830  #print 'Aggregating events.list'
1831  eventsOutList = safeopen(eventlistdest)
1832  for event in eventLists:
1833  #print event
1834  eventsOutList.write("%s\n" % event)
1835  eventsOutList.close()
1836  if len(eventLists) == 0:
1837  project_utilities.addLayerTwo(eventlistdest)
1838 
1839  #create the bad.list for makeup jobs
1840  if(len(badLists) > 0):
1841  badlistdest = os.path.join(stage.bookdir, 'bad.list')
1842  badOutList = safeopen(badlistdest)
1843  for bad in badLists:
1844  badOutList.write("%s\n" % bad)
1845  badOutList.close()
1846  #project_utilities.addLayerTwo(badlistdest)
1847 
1848  #create the missing_files.list for makeup jobs
1849  missing_files = []
1850  if stage.inputdef == '' and not stage.pubs_output:
1851  input_files = get_input_files(stage)
1852  if len(input_files) > 0:
1853  missing_files = list(set(input_files) - set(transferredFiles))
1854 
1855  if len(missing_files) > 0:
1856  missinglistdest = os.path.join(stage.bookdir, 'missing_files.list')
1857  missingOutList = safeopen(missinglistdest)
1858  for missing in missing_files:
1859  missingOutList.write("%s\n" % missing)
1860  missingOutList.close()
1861  #project_utilities.addLayerTwo(missingOutList)
1862 
1863  #create the transferred_uris for the next step
1864  urilistdest = os.path.join(stage.bookdir, 'transferred_uris.list')
1865  if larbatch_posix.exists(urilistdest):
1866  #print 'Deleting %s' % urilistdest
1867  larbatch_posix.remove(urilistdest)
1868  if len(goodLogDirs) == 1 and len(transferredFiles) > 0:
1869  src = '%s/transferred_uris.list' % goodLogDirs.copy().pop()
1870  #print 'Symlinking %s to %s' % (src, urilistdest)
1871  larbatch_posix.symlink(src, urilistdest)
1872  else:
1873  #print 'Aggregating transferred_uris.list'
1874  uriOutList = safeopen(urilistdest)
1875  for uri in transferredFiles:
1876  #print event
1877  uriOutList.write("%s\n" % uri)
1878  uriOutList.close()
1879  if len(transferredFiles) == 0:
1880  project_utilities.addLayerTwo(urilistdest)
1881 
1882  if stage.inputdef != '':
1883  samprojectdest = os.path.join(stage.bookdir, 'sam_projects.list')
1884  if larbatch_posix.exists(samprojectdest):
1885  #print 'Deleting %s' % samprojectdest
1886  larbatch_posix.remove(samprojectdest)
1887  if len(goodLogDirs) == 1:
1888  src = '%s/sam_project.txt' % goodLogDirs.copy().pop()
1889  #print 'Symlinking %s to %s' % (src, samprojectdest)
1890  larbatch_posix.symlink(src, samprojectdest)
1891  else:
1892  #print 'Aggregating sam_projects.list'
1893  samprojectfile = safeopen(samprojectdest)
1894  for sam in sam_projects:
1895  samprojectfile.write("%s\n" % sam)
1896  samprojectfile.close()
1897  if len(sam_projects) == 0:
1898  project_utilities.addLayerTwo(samprojectdest)
1899 
1900  cpiddest = os.path.join(stage.bookdir, 'cpids.list')
1901  if larbatch_posix.exists(cpiddest):
1902  #print 'Deleting %s' % cpiddest
1903  larbatch_posix.remove(cpiddest)
1904  if len(goodLogDirs) == 1:
1905  src = '%s/cpid.txt' % goodLogDirs.copy().pop()
1906  #print 'Symlinking %s to %s' % (src, cpiddest)
1907  larbatch_posix.symlink(src, cpiddest)
1908  else:
1909  #print 'Aggregating cpids.list'
1910  cpidfile = safeopen(cpiddest)
1911  for cp in cpids:
1912  cpidfile.write("%s \n" % cp)
1913  cpidfile.close()
1914  if len(cpids) == 0:
1915  project_utilities.addLayerTwo(cpiddest)
1916 
1917 
1918  for stream in streamLists:
1919  streamdest = os.path.join(stage.bookdir, stream)
1920  if larbatch_posix.exists(streamdest):
1921  #print 'Deleting %s' % streamdest
1922  larbatch_posix.remove(streamdest)
1923  if len(goodLogDirs) == 1:
1924  src = '%s/%s' % (goodLogDirs.copy().pop(), stream)
1925  #print 'Symlinking %s to %s' % (src, streamdest)
1926  larbatch_posix.symlink(src, streamdest)
1927  else:
1928  #print 'Aggregating %s' % stream
1929  streamOutList = safeopen(streamdest)
1930  for line in streamLists[stream]:
1931  streamOutList.write("%s\n" % line)
1932  streamOutList.close()
1933  if len(streamLists[stream]) == 0:
1934  project_utilities.addLayerTwo(streamdest)
1935 
1936 
1937 
1938 
1939 
1940  print('Number of errors = %d' % nErrors)
1941 
1942  return nErrors
1943 
1944 # Check project results in the specified directory.
1945 
1946 def dofetchlog(project, stage):
1947 
1948  # This funciton fetches jobsub log files using command
1949  # jobsub_fetchlog. Fetched log files are stored in a subdirectory
1950  # called "log" in the stage output directory.
1951  #
1952  # This function has uses an algorithm to determine the log file
1953  # job id that is based on the worker environment as recorded in
1954  # file "env.txt" as returned from any worker. Therefore, at least
1955  # one worker must have completed (successfully or not) for this
1956  # function to succeed.
1957 
1958  stage.checkinput()
1959  stage.checkdirs()
1960 
1961  # Look for files called "env.txt" in any subdirectory of
1962  # stage.bookdir.
1963 
1964  logids = []
1965  for dirpath, dirnames, filenames in larbatch_posix.walk(stage.bookdir):
1966  for filename in filenames:
1967  if filename == 'env.txt':
1968 
1969  # Look for either environment variable:
1970  #
1971  # 1. JOBSUBPARENTJOBID
1972  # 2. JOBSUBJOBID
1973  #
1974  # In either case, construct the log file id by
1975  # changing the process number to zero.
1976 
1977  logid = ''
1978  envpath = os.path.join(dirpath, filename)
1979  vars = larbatch_posix.readlines(envpath)
1980 
1981  # JOBSUBPARENTJOBID
1982 
1983  for var in vars:
1984  varsplit = var.split('=', 1)
1985  name = varsplit[0].strip()
1986  if name == 'JOBSUBPARENTJOBID':
1987  logid = varsplit[1].strip()
1988 
1989  # Fix up the log file id by changing the process
1990  # number to zero.
1991 
1992  logsplit = logid.split('@', 1)
1993  cluster_process = logsplit[0]
1994  server = logsplit[1]
1995  cluster = cluster_process.split('.', 1)[0]
1996  logid = cluster + '.0' + '@' + server
1997  logids.append(logid)
1998  break
1999 
2000  # JOBSUBJOBID
2001 
2002  if logid == '':
2003  for var in vars:
2004  varsplit = var.split('=', 1)
2005  name = varsplit[0].strip()
2006  if name == 'JOBSUBJOBID':
2007  logid = varsplit[1].strip()
2008 
2009  # Fix up the log file id by changing the process
2010  # number to zero.
2011 
2012  logsplit = logid.split('@', 1)
2013  cluster_process = logsplit[0]
2014  server = logsplit[1]
2015  cluster = cluster_process.split('.', 1)[0]
2016  logid = cluster + '.0' + '@' + server
2017  logids.append(logid)
2018  break
2019 
2020  # Process all of the log ids that we found.
2021 
2022  if len(logids) > 0:
2023 
2024  # Make a directory to receive log files.
2025 
2026  logdir = os.path.join(stage.bookdir, 'log')
2027  if larbatch_posix.exists(logdir):
2028  larbatch_posix.rmtree(logdir)
2029  larbatch_posix.mkdir(logdir)
2030 
2031  # Loop over log ids.
2032 
2033  for logid in set(logids):
2034 
2035  # Do the actual fetch.
2036  # Tarball is fetched into current directory, and unpacked
2037  # into log directory.
2038 
2039  print('Fetching log files for id %s' % logid)
2040  command = ['jobsub_fetchlog']
2041  if project.server != '-' and project.server != '':
2042  command.append('--jobsub-server=%s' % project.server)
2043  command.append('--jobid=%s' % logid)
2044  command.append('--dest-dir=%s' % logdir)
2045  jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2046  jobout, joberr = jobinfo.communicate()
2047  jobout = convert_str(jobout)
2048  joberr = convert_str(joberr)
2049  rc = jobinfo.poll()
2050  if rc != 0:
2051  raise JobsubError(command, rc, jobout, joberr)
2052 
2053  return 0
2054 
2055  else:
2056 
2057  # Done (failure).
2058  # If we fall out of the loop, we didn't find any files called env.txt, or
2059  # they didn't contain the right environment variables we need.
2060  # In this case, the most likely explanation is that no workers have
2061  # completed yet.
2062 
2063  print('Failed to fetch log files.')
2064  return 1
2065 
2066 
2067 # Check sam declarations.
2068 # Return 0 if all files are declared or don't have internal metadata.
2069 # Return nonzero if some files have metadata but are are not declared.
2070 
2071 def docheck_declarations(logdir, outdir, declare, ana=False):
2072 
2073  # Default result success (all files declared).
2074 
2075  result = 0
2076 
2077  # Initialize samweb.
2078 
2079  import_samweb()
2080 
2081  # Loop over root files listed in files.list or filesana.list.
2082 
2083  roots = []
2084  listname = 'files.list'
2085  if ana:
2086  listname = 'filesana.list'
2087  fnlist = os.path.join(logdir, listname)
2088  if larbatch_posix.exists(fnlist):
2089  roots = larbatch_posix.readlines(fnlist)
2090  else:
2091  raise RuntimeError('No %s file found %s, run project.py --check' % (listname, fnlist))
2092 
2093  for root in roots:
2094  path = root.strip()
2095  fn = os.path.basename(path)
2096  dirpath = os.path.dirname(path)
2097  dirname = os.path.relpath(dirpath, outdir)
2098 
2099  # Check metadata
2100 
2101  has_metadata = False
2102  try:
2103  md = samweb.getMetadata(filenameorid=fn)
2104  has_metadata = True
2105  except samweb_cli.exceptions.FileNotFound:
2106  pass
2107 
2108  # Report or declare file.
2109 
2110  if has_metadata:
2111  print('Metadata OK: %s' % fn)
2112  else:
2113  if declare:
2114  print('Declaring: %s' % fn)
2115  jsonfile = os.path.join(logdir, os.path.join(dirname, fn)) + '.json'
2116  mdjson = {}
2117  if larbatch_posix.exists(jsonfile):
2118  mdlines = larbatch_posix.readlines(jsonfile)
2119  mdtext = ''
2120  for line in mdlines:
2121  mdtext = mdtext + line
2122  try:
2123  md = json.loads(mdtext)
2124  mdjson = md
2125  except:
2126  pass
2127  md = {}
2128  if ana:
2129  md = mdjson
2130  else:
2131  expSpecificMetaData = expMetaData(os.environ['SAM_EXPERIMENT'],larbatch_posix.root_stream(path))
2132  md = expSpecificMetaData.getmetadata(mdjson)
2133  if len(md) > 0:
2134  project_utilities.test_kca()
2135 
2136  # Make lack of parent files a nonfatal error.
2137  # This should probably be removed at some point.
2138 
2139  try:
2140  samweb.declareFile(md=md)
2141  except:
2142  #if md.has_key('parents'):
2143  # del md['parents']
2144  # samweb.declareFile(md=md)
2145  print('SAM declare failed.')
2146  result = 1
2147 
2148  else:
2149  print('No sam metadata found for %s.' % fn)
2150  else:
2151  print('Not declared: %s' % fn)
2152  result = 1
2153 
2154  return result
2155 
2156 # Print summary of files returned by sam query.
2157 
2159 
2160  # Initialize samweb.
2161 
2162  import_samweb()
2163 
2164  # Do query
2165 
2166  result = samweb.listFilesSummary(dimensions=dim)
2167  for key in list(result.keys()):
2168  print('%s: %s' % (key, result[key]))
2169 
2170  return 0
2171 
2172 # Check sam dataset definition.
2173 # Return 0 if dataset is defined or definition name is null.
2174 # Return nonzero if dataset is not defined.
2175 
2176 def docheck_definition(defname, dim, define):
2177 
2178  # Default rssult success.
2179 
2180  result = 0
2181 
2182  # Return success for null definition.
2183 
2184  if defname == '':
2185  return result
2186 
2187  # Initialize samweb.
2188 
2189  import_samweb()
2190 
2191  # See if this definition already exists.
2192 
2193  def_exists = False
2194  try:
2195  desc = samweb.descDefinition(defname=defname)
2196  def_exists = True
2197  except samweb_cli.exceptions.DefinitionNotFound:
2198  pass
2199 
2200  # Make report and maybe make definition.
2201 
2202  if def_exists:
2203  print('Definition already exists: %s' % defname)
2204  else:
2205  if define:
2206  print('Creating definition %s.' % defname)
2207  project_utilities.test_kca()
2208  samweb.createDefinition(defname=defname, dims=dim)
2209  else:
2210  result = 1
2211  print('Definition should be created: %s' % defname)
2212 
2213  return result
2214 
2215 # Print summary of files returned by dataset definition.
2216 
2217 def dotest_definition(defname):
2218 
2219  # Initialize samweb.
2220 
2221  import_samweb()
2222 
2223  # Do query
2224 
2225  result = samweb.listFilesSummary(defname=defname)
2226  for key in list(result.keys()):
2227  print('%s: %s' % (key, result[key]))
2228 
2229  return 0
2230 
2231 # Delete sam dataset definition.
2232 
2233 def doundefine(defname):
2234 
2235  if defname == '':
2236  return 1
2237 
2238  # Initialize samweb.
2239 
2240  import_samweb()
2241 
2242  # See if this definition already exists.
2243 
2244  def_exists = False
2245  try:
2246  desc = samweb.descDefinition(defname=defname)
2247  def_exists = True
2248  except samweb_cli.exceptions.DefinitionNotFound:
2249  pass
2250 
2251  # Make report and maybe make definition.
2252 
2253  if def_exists:
2254  print('Deleting definition: %s' % defname)
2255  project_utilities.test_kca()
2256  samweb.deleteDefinition(defname=defname)
2257  else:
2258  print('No such definition: %s' % defname)
2259 
2260  return 0
2261 
2262 # Check disk locations. Maybe add or remove locations.
2263 # This method only generates output and returns zero.
2264 
2265 def docheck_locations(dim, outdir, add, clean, remove, upload):
2266 
2267  if add:
2268  print('Adding disk locations.')
2269  elif clean:
2270  print('Cleaning disk locations.')
2271  elif remove:
2272  print('Removing disk locations.')
2273  elif upload:
2274  print('Uploading to FTS.')
2275  else:
2276  print('Checking disk locations.')
2277 
2278  # Initialize samweb.
2279 
2280  import_samweb()
2281 
2282  # Loop over files queried by dimension string.
2283 
2284  filelist = samweb.listFiles(dimensions=dim, stream=False)
2285 
2286  # Look for listed files on disk under outdir.
2287 
2288  disk_dict = {}
2289  for filename in filelist:
2290  disk_dict[filename] = []
2291  for out_subpath, subdirs, files in larbatch_posix.walk(outdir):
2292 
2293  # Only examine files in leaf directories.
2294 
2295  if len(subdirs) != 0:
2296  continue
2297 
2298  for fn in files:
2299  if fn in filelist:
2300  disk_dict[fn].append(out_subpath)
2301 
2302  # Check sam locations.
2303 
2304  for filename in filelist:
2305  disk_locs = disk_dict[filename]
2306  sam_locs = samweb.locateFile(filenameorid=filename)
2307  if len(sam_locs) == 0 and not upload:
2308  print('No location: %s' % filename)
2309 
2310  # Make a double loop over disk and sam locations, in order
2311  # to identify locations that should added.
2312  # Note that we ignore the node part of the sam location.
2313 
2314  locs_to_add = []
2315  for disk_loc in disk_locs:
2316  should_add = True
2317  for sam_loc in sam_locs:
2318  if sam_loc['location_type'] == 'disk':
2319  if disk_loc == sam_loc['location'].split(':')[-1]:
2320  should_add = False
2321  break
2322  if should_add:
2323  locs_to_add.append(disk_loc)
2324 
2325  # Loop over sam locations, in order to identify locations
2326  # that should be removed. Note that for this step, we don't
2327  # necessarily assume that we found the disk location
2328  # in the directory search above, rather check the existence
2329  # of the file directly.
2330 
2331  locs_to_remove = []
2332  for sam_loc in sam_locs:
2333  if sam_loc['location_type'] == 'disk':
2334 
2335  # If remove is specified, uncondiontally remove this location.
2336 
2337  if remove:
2338  locs_to_remove.append(sam_loc['location'])
2339 
2340  # Otherwise, check if file exists.
2341 
2342  else:
2343 
2344  # Split off the node, if any, from the location.
2345 
2346  local_path = os.path.join(sam_loc['location'].split(':')[-1], filename)
2347  if not larbatch_posix.exists(local_path):
2348  locs_to_remove.append(sam_loc['location'])
2349 
2350  # Loop over sam locations and identify files that can be uploaded.
2351  # If this file has no disk locations, don't do anything (not an error).
2352  # In case we decide to upload this file, always upload from the first
2353  # disk location.
2354 
2355  locs_to_upload = {} # locs_to_upload[disk-location] = dropbox-directory
2356  should_upload = False
2357  if upload and len(disk_locs) > 0:
2358  should_upload = True
2359  for sam_loc in sam_locs:
2360  if sam_loc['location_type'] == 'tape':
2361  should_upload = False
2362  break
2363  if should_upload:
2364  dropbox = project_utilities.get_dropbox(filename)
2365  if not larbatch_posix.exists(dropbox):
2366  print('Making dropbox directory %s.' % dropbox)
2367  larbatch_posix.makedirs(dropbox)
2368  locs_to_upload[disk_locs[0]] = dropbox
2369 
2370  # Report results and do the actual adding/removing/uploading.
2371 
2372  for loc in locs_to_add:
2373  node = project_utilities.get_bluearc_server()
2374  if loc[0:6] == '/pnfs/':
2375  node = project_utilities.get_dcache_server()
2376  loc = node + loc.split(':')[-1]
2377  if add:
2378  print('Adding location: %s.' % loc)
2379  project_utilities.test_kca()
2380  samweb.addFileLocation(filenameorid=filename, location=loc)
2381  elif not upload:
2382  print('Can add location: %s.' % loc)
2383 
2384  for loc in locs_to_remove:
2385  if clean or remove:
2386  print('Removing location: %s.' % loc)
2387  project_utilities.test_kca()
2388  samweb.removeFileLocation(filenameorid=filename, location=loc)
2389  elif not upload:
2390  print('Should remove location: %s.' % loc)
2391 
2392  for loc in list(locs_to_upload.keys()):
2393  dropbox = locs_to_upload[loc]
2394 
2395  # Make sure dropbox directory exists.
2396 
2397  if not larbatch_posix.isdir(dropbox):
2398  print('Dropbox directory %s does not exist.' % dropbox)
2399  else:
2400 
2401  # Test whether this file has already been copied to dropbox directory.
2402 
2403  dropbox_filename = os.path.join(dropbox, filename)
2404  if larbatch_posix.exists(dropbox_filename):
2405  print('File %s already exists in dropbox %s.' % (filename, dropbox))
2406  else:
2407 
2408  # Copy file to dropbox.
2409 
2410  loc_filename = os.path.join(loc, filename)
2411 
2412  # Decide whether to use a symlink or copy.
2413 
2414  if project_utilities.mountpoint(loc_filename) == \
2415  project_utilities.mountpoint(dropbox_filename):
2416  print('Symlinking %s to dropbox directory %s.' % (filename, dropbox))
2417  relpath = os.path.relpath(os.path.realpath(loc_filename), dropbox)
2418  print('relpath=',relpath)
2419  print('dropbox_filename=',dropbox_filename)
2420  larbatch_posix.symlink(relpath, dropbox_filename)
2421 
2422  else:
2423  print('Copying %s to dropbox directory %s.' % (filename, dropbox))
2424  larbatch_posix.copy(loc_filename, dropbox_filename)
2425 
2426  return 0
2427 
2428 # Check tape locations.
2429 # Return 0 if all files in sam have tape locations.
2430 # Return nonzero if some files in sam don't have tape locations.
2431 
2432 def docheck_tape(dim):
2433 
2434  # Default result success.
2435 
2436  result = 0
2437 
2438  # Initialize samweb.
2439 
2440  import_samweb()
2441 
2442  # Loop over files queried by dimension string.
2443 
2444  nbad = 0
2445  ntot = 0
2446  filelist = samweb.listFiles(dimensions=dim, stream=True)
2447  while 1:
2448  try:
2449  filename = next(filelist)
2450  except StopIteration:
2451  break
2452 
2453  # Got a filename.
2454 
2455  ntot = ntot + 1
2456 
2457  # Look for sam tape locations.
2458 
2459  is_on_tape = False
2460  sam_locs = samweb.locateFile(filenameorid=filename)
2461  for sam_loc in sam_locs:
2462  if sam_loc['location_type'] == 'tape':
2463  is_on_tape = True
2464  break
2465 
2466  if is_on_tape:
2467  print('On tape: %s' % filename)
2468  else:
2469  result = 1
2470  nbad = nbad + 1
2471  print('Not on tape: %s' % filename)
2472 
2473  print('%d files.' % ntot)
2474  print('%d files need to be store on tape.' % nbad)
2475 
2476  return result
2477 
2478 # Copy files to workdir and issue jobsub submit command.
2479 # Return jobsubid.
2480 # Raise exception if jobsub_submit returns a nonzero status.
2481 
2482 def dojobsub(project, stage, makeup, recur, dryrun):
2483 
2484  # Default return.
2485 
2486  jobid = ''
2487 
2488  # Process map, to be filled later if we need one.
2489 
2490  procmap = ''
2491 
2492  # Temporary directory where we will copy the batch script(s) and dag.
2493 
2494  tmpdir = tempfile.mkdtemp()
2495 
2496  # Temporary directory where we will copy files destined for stage.workdir.
2497 
2498  tmpworkdir = tempfile.mkdtemp()
2499 
2500  #we're going to let jobsub_submit copy the workdir contents for us
2501  #each file that would go into the workdir is going to be added with
2502  # '-f <input_file>' with the full path, it can be either BlueArc or /pnfs/uboone
2503 
2504  jobsub_workdir_files_args = []
2505 
2506  # If there is an input list, copy it to the work directory.
2507 
2508  input_list_name = ''
2509  if stage.inputlist != '':
2510  input_list_name = os.path.basename(stage.inputlist)
2511  work_list_name = os.path.join(tmpworkdir, input_list_name)
2512  if stage.inputlist != work_list_name:
2513  input_files = larbatch_posix.readlines(stage.inputlist)
2514  print('Making input list.')
2515  work_list = safeopen(work_list_name)
2516  for input_file in input_files:
2517  print('Adding input file %s' % input_file)
2518  work_list.write('%s\n' % input_file.strip())
2519  work_list.close()
2520  print('Done making input list.')
2521 
2522  # Now locate the fcl file on the fcl search path.
2523 
2524  fcls = project.get_fcl(stage.fclname)
2525 
2526  # Copy the fcl file to the work directory.
2527 
2528  for fcl in fcls:
2529  workfcl = os.path.join(tmpworkdir, os.path.basename(fcl))
2530  if os.path.abspath(fcl) != os.path.abspath(workfcl):
2531  larbatch_posix.copy(fcl, workfcl)
2532 
2533 
2534  # Construct a wrapper fcl file (called "wrapper.fcl") that will include
2535  # the original fcls, plus any overrides that are dynamically generated
2536  # in this script.
2537 
2538  #print 'Making wrapper.fcl'
2539  wrapper_fcl_name = os.path.join(tmpworkdir, 'wrapper.fcl')
2540  wrapper_fcl = safeopen(wrapper_fcl_name)
2541  stageNum = 0
2542  original_project_name = project.name
2543  original_stage_name = stage.name
2544  original_project_version = project.version
2545 
2546  for fcl in fcls:
2547  wrapper_fcl.write('#---STAGE %d\n' % stageNum)
2548  wrapper_fcl.write('#include "%s"\n' % os.path.basename(fcl))
2549  wrapper_fcl.write('\n')
2550 
2551  # Generate overrides for sam metadata fcl parameters.
2552  # Only do this if our xml file appears to contain sam metadata.
2553 
2554  xml_has_metadata = project.file_type != '' or \
2555  project.run_type != ''
2556  if xml_has_metadata:
2557 
2558  # Add overrides for FileCatalogMetadata.
2559 
2560  if project.release_tag != '':
2561  wrapper_fcl.write('services.FileCatalogMetadata.applicationVersion: "%s"\n' % \
2562  project.release_tag)
2563  else:
2564  wrapper_fcl.write('services.FileCatalogMetadata.applicationVersion: "test"\n')
2565  if project.file_type:
2566  wrapper_fcl.write('services.FileCatalogMetadata.fileType: "%s"\n' % \
2567  project.file_type)
2568  if project.run_type:
2569  wrapper_fcl.write('services.FileCatalogMetadata.runType: "%s"\n' % \
2570  project.run_type)
2571 
2572 
2573  # Add experiment-specific sam metadata.
2574 
2575  if stageNum < len(stage.project_name) and stage.project_name[stageNum] != '':
2576  project.name = stage.project_name[stageNum]
2577  if stageNum < len(stage.stage_name) and stage.stage_name[stageNum] != '':
2578  stage.name = stage.stage_name[stageNum]
2579  if stageNum < len(stage.project_version) and stage.project_version[stageNum] != '':
2580  project.version = stage.project_version[stageNum]
2581  sam_metadata = project_utilities.get_sam_metadata(project, stage)
2582  if sam_metadata:
2583  wrapper_fcl.write(sam_metadata)
2584  project.name = original_project_name
2585  stage.name = original_stage_name
2586  project.version = original_project_version
2587 
2588  # In case of generator jobs, add override for pubs run number
2589  # (subrun number is overridden inside condor_lar.sh).
2590 
2591  if (not stage.pubs_input and stage.pubs_output) or stage.output_run:
2592  wrapper_fcl.write('source.firstRun: %d\n' % stage.output_run)
2593 
2594  # Add overrides for genie flux parameters.
2595  # This section will normally be generated for any kind of generator job,
2596  # and should be harmless for non-genie generators.
2597 
2598  if stage.maxfluxfilemb != 0 and stageNum == 0:
2599  wrapper_fcl.write('physics.producers.generator.FluxCopyMethod: "IFDH"\n')
2600  wrapper_fcl.write('physics.producers.generator.MaxFluxFileMB: %d\n' % stage.maxfluxfilemb)
2601  wrapper_fcl.write('#---END_STAGE\n')
2602  stageNum = 1 + stageNum
2603 
2604  wrapper_fcl.close()
2605  #print 'Done making wrapper.fcl'
2606 
2607  # Get experiment setup script. Maybe copy to work directory.
2608  # After this section, either variable (not both) abssetupscript or
2609  # setupscript will be set to a non-null value.
2610 
2611  abssetupscript = project_utilities.get_setup_script_path()
2612  setupscript = ''
2613  if not abssetupscript.startswith('/cvmfs/'):
2614  setupscript = os.path.join(stage.workdir,'setup_experiment.sh')
2615  larbatch_posix.copy(abssetupscript, setupscript)
2616  jobsub_workdir_files_args.extend(['-f', setupscript])
2617  abssetupscript = ''
2618 
2619  # Copy and rename batch script to the work directory.
2620 
2621  if stage.batchname != '':
2622  workname = stage.batchname
2623  else:
2624  workname = '%s-%s-%s' % (stage.name, project.name, project.release_tag)
2625  workname = workname + os.path.splitext(stage.script)[1]
2626  #workscript = os.path.join(tmpworkdir, workname)
2627  workscript = os.path.join(tmpdir, workname)
2628  if stage.script != workscript:
2629  larbatch_posix.copy(stage.script, workscript)
2630 
2631  # Copy and rename sam start project script to work directory.
2632 
2633  workstartscript = ''
2634  workstartname = ''
2635  if stage.start_script != '':
2636  workstartname = 'start-%s' % workname
2637  #workstartscript = os.path.join(tmpworkdir, workstartname)
2638  workstartscript = os.path.join(tmpdir, workstartname)
2639  if stage.start_script != workstartscript:
2640  larbatch_posix.copy(stage.start_script, workstartscript)
2641 
2642  # Copy and rename sam stop project script to work directory.
2643 
2644  workstopscript = ''
2645  workstopname = ''
2646  if stage.stop_script != '':
2647  workstopname = 'stop-%s' % workname
2648  #workstopscript = os.path.join(tmpworkdir, workstopname)
2649  workstopscript = os.path.join(tmpdir, workstopname)
2650  if stage.stop_script != workstopscript:
2651  larbatch_posix.copy(stage.stop_script, workstopscript)
2652 
2653  # Copy worker initialization scripts to work directory.
2654 
2655  for init_script in stage.init_script:
2656  if init_script != '':
2657  if not larbatch_posix.exists(init_script):
2658  raise RuntimeError('Worker initialization script %s does not exist.\n' % \
2659  init_script)
2660  work_init_script = os.path.join(tmpworkdir, os.path.basename(init_script))
2661  if init_script != work_init_script:
2662  larbatch_posix.copy(init_script, work_init_script)
2663 
2664  # Update stage.init_script from list to single script.
2665 
2666  n = len(stage.init_script)
2667  if n == 0:
2668  stage.init_script = ''
2669  elif n == 1:
2670  stage.init_script = stage.init_script[0]
2671  else:
2672 
2673  # If there are multiple init scripts, generate a wrapper init script init_wrapper.sh.
2674 
2675  work_init_wrapper = os.path.join(tmpworkdir, 'init_wrapper.sh')
2676  f = open(work_init_wrapper, 'w')
2677  f.write('#! /bin/bash\n')
2678  for init_script in stage.init_script:
2679  f.write('echo\n')
2680  f.write('echo "Executing %s"\n' % os.path.basename(init_script))
2681  f.write('./%s\n' % os.path.basename(init_script))
2682  f.write('status=$?\n')
2683  f.write('echo "%s finished with status $status"\n' % os.path.basename(init_script))
2684  f.write('if [ $status -ne 0 ]; then\n')
2685  f.write(' exit $status\n')
2686  f.write('fi\n')
2687  f.write('echo\n')
2688  f.write('echo "Done executing initialization scripts."\n')
2689  f.close()
2690  stage.init_script = work_init_wrapper
2691 
2692  # Copy worker initialization source scripts to work directory.
2693 
2694  for init_source in stage.init_source:
2695  if init_source != '':
2696  if not larbatch_posix.exists(init_source):
2697  raise RuntimeError('Worker initialization source script %s does not exist.\n' % \
2698  init_source)
2699  work_init_source = os.path.join(tmpworkdir, os.path.basename(init_source))
2700  if init_source != work_init_source:
2701  larbatch_posix.copy(init_source, work_init_source)
2702 
2703  # Update stage.init_source from list to single script.
2704 
2705  n = len(stage.init_source)
2706  if n == 0:
2707  stage.init_source = ''
2708  elif n == 1:
2709  stage.init_source = stage.init_source[0]
2710  else:
2711 
2712  # If there are multiple init source scripts, generate a wrapper init script
2713  # init_source_wrapper.sh.
2714 
2715  work_init_source_wrapper = os.path.join(tmpworkdir, 'init_source_wrapper.sh')
2716  f = open(work_init_source_wrapper, 'w')
2717  for init_source in stage.init_source:
2718  f.write('echo\n')
2719  f.write('echo "Sourcing %s"\n' % os.path.basename(init_source))
2720  f.write('source %s\n' % os.path.basename(init_source))
2721  f.write('echo\n')
2722  f.write('echo "Done sourcing initialization scripts."\n')
2723  f.close()
2724  stage.init_source = work_init_source_wrapper
2725 
2726  # Copy worker end-of-job scripts to work directory.
2727 
2728  for end_script in stage.end_script:
2729  if end_script != '':
2730  if not larbatch_posix.exists(end_script):
2731  raise RuntimeError('Worker end-of-job script %s does not exist.\n' % end_script)
2732  work_end_script = os.path.join(tmpworkdir, os.path.basename(end_script))
2733  if end_script != work_end_script:
2734  larbatch_posix.copy(end_script, work_end_script)
2735 
2736  # Update stage.end_script from list to single script.
2737 
2738  n = len(stage.end_script)
2739  if n == 0:
2740  stage.end_script = ''
2741  elif n == 1:
2742  stage.end_script = stage.end_script[0]
2743  else:
2744 
2745  # If there are multiple end scripts, generate a wrapper end script end_wrapper.sh.
2746 
2747  work_end_wrapper = os.path.join(tmpworkdir, 'end_wrapper.sh')
2748  f = open(work_end_wrapper, 'w')
2749  f.write('#! /bin/bash\n')
2750  for end_script in stage.end_script:
2751  f.write('echo\n')
2752  f.write('echo "Executing %s"\n' % os.path.basename(end_script))
2753  f.write('./%s\n' % os.path.basename(end_script))
2754  f.write('status=$?\n')
2755  f.write('echo "%s finished with status $status"\n' % os.path.basename(end_script))
2756  f.write('if [ $status -ne 0 ]; then\n')
2757  f.write(' exit $status\n')
2758  f.write('fi\n')
2759  f.write('echo\n')
2760  f.write('echo "Done executing finalization scripts."\n')
2761  f.close()
2762  stage.end_script = work_end_wrapper
2763 
2764  # Copy worker midstage source initialization scripts to work directory.
2765 
2766  for istage in stage.mid_source:
2767  for mid_source in stage.mid_source[istage]:
2768  if mid_source != '':
2769  if not larbatch_posix.exists(mid_source):
2770  raise RuntimeError('Worker midstage initialization source script %s does not exist.\n' % mid_source)
2771  work_mid_source = os.path.join(tmpworkdir, os.path.basename(mid_source))
2772  if mid_source != work_mid_source:
2773  larbatch_posix.copy(mid_source, work_mid_source)
2774 
2775  # Generate midstage source initialization wrapper script mid_source_wrapper.sh
2776  # and update stage.mid_script to point to wrapper.
2777  # Note that variable $stage should be defined external to this script.
2778 
2779  if len(stage.mid_source) > 0:
2780  work_mid_source_wrapper = os.path.join(tmpworkdir, 'mid_source_wrapper.sh')
2781  f = open(work_mid_source_wrapper, 'w')
2782  for istage in stage.mid_source:
2783  for mid_source in stage.mid_source[istage]:
2784  f.write('if [ $stage -eq %d ]; then\n' % istage)
2785  f.write(' echo\n')
2786  f.write(' echo "Sourcing %s"\n' % os.path.basename(mid_source))
2787  f.write(' source %s\n' % os.path.basename(mid_source))
2788  f.write('fi\n')
2789  f.write('echo\n')
2790  f.write('echo "Done sourcing midstage source initialization scripts for stage $stage."\n')
2791  f.close()
2792  stage.mid_source = work_mid_source_wrapper
2793  else:
2794  stage.mid_source = ''
2795 
2796  # Copy worker midstage finalization scripts to work directory.
2797 
2798  for istage in stage.mid_script:
2799  for mid_script in stage.mid_script[istage]:
2800  if mid_script != '':
2801  if not larbatch_posix.exists(mid_script):
2802  raise RuntimeError('Worker midstage finalization script %s does not exist.\n' % mid_script)
2803  work_mid_script = os.path.join(tmpworkdir, os.path.basename(mid_script))
2804  if mid_script != work_mid_script:
2805  larbatch_posix.copy(mid_script, work_mid_script)
2806 
2807  # Generate midstage finalization wrapper script mid_wrapper.sh and update stage.mid_script
2808  # to point to wrapper.
2809 
2810  if len(stage.mid_script) > 0:
2811  work_mid_wrapper = os.path.join(tmpworkdir, 'mid_wrapper.sh')
2812  f = open(work_mid_wrapper, 'w')
2813  f.write('#! /bin/bash\n')
2814  f.write('stage=$1\n')
2815  for istage in stage.mid_script:
2816  for mid_script in stage.mid_script[istage]:
2817  f.write('if [ $stage -eq %d ]; then\n' % istage)
2818  f.write(' echo\n')
2819  f.write(' echo "Executing %s"\n' % os.path.basename(mid_script))
2820  f.write(' ./%s\n' % os.path.basename(mid_script))
2821  f.write(' status=$?\n')
2822  f.write(' echo "%s finished with status $status"\n' % os.path.basename(mid_script))
2823  f.write(' if [ $status -ne 0 ]; then\n')
2824  f.write(' exit $status\n')
2825  f.write(' fi\n')
2826  f.write('fi\n')
2827  f.write('echo\n')
2828  f.write('echo "Done executing midstage finalization scripts for stage $stage."\n')
2829  f.close()
2830  stage.mid_script = work_mid_wrapper
2831  else:
2832  stage.mid_script = ''
2833 
2834  # Copy helper scripts to work directory.
2835 
2836  helpers = ('root_metadata.py',
2837  'merge_json.py',
2838  'subruns.py',
2839  'validate_in_job.py',
2840  'mkdir.py',
2841  'emptydir.py',
2842  'file_to_url.sh')
2843 
2844  for helper in helpers:
2845 
2846  # Find helper script in execution path.
2847 
2848  jobinfo = subprocess.Popen(['which', helper],
2849  stdout=subprocess.PIPE,
2850  stderr=subprocess.PIPE)
2851  jobout, joberr = jobinfo.communicate()
2852  jobout = convert_str(jobout)
2853  joberr = convert_str(joberr)
2854  rc = jobinfo.poll()
2855  helper_path = jobout.splitlines()[0].strip()
2856  if rc == 0:
2857  work_helper = os.path.join(tmpworkdir, helper)
2858  if helper_path != work_helper:
2859  larbatch_posix.copy(helper_path, work_helper)
2860  else:
2861  print('Helper script %s not found.' % helper)
2862 
2863  # Copy helper python modules to work directory.
2864  # Note that for this to work, these modules must be single files.
2865 
2866  helper_modules = ('larbatch_posix',
2867  'project_utilities',
2868  'larbatch_utilities',
2869  'experiment_utilities',
2870  'extractor_dict')
2871 
2872  for helper_module in helper_modules:
2873 
2874  # Find helper module files.
2875 
2876  jobinfo = subprocess.Popen(['python'],
2877  stdin=subprocess.PIPE,
2878  stdout=subprocess.PIPE,
2879  stderr=subprocess.PIPE)
2880  cmd = 'import %s\nprint(%s.__file__)\n' % (helper_module, helper_module)
2881  jobinfo.stdin.write(convert_bytes(cmd))
2882  jobout, joberr = jobinfo.communicate()
2883  jobout = convert_str(jobout)
2884  joberr = convert_str(joberr)
2885  rc = jobinfo.poll()
2886  helper_path = jobout.splitlines()[-1].strip()
2887  if rc == 0:
2888  #print 'helper_path = %s' % helper_path
2889  work_helper = os.path.join(tmpworkdir, os.path.basename(helper_path))
2890  if helper_path != work_helper:
2891  larbatch_posix.copy(helper_path, work_helper)
2892  else:
2893  print('Helper python module %s not found.' % helper_module)
2894 
2895  # If this is a makeup action, find list of missing files.
2896  # If sam information is present (cpids.list), create a makeup dataset.
2897 
2898  if makeup:
2899 
2900  checked_file = os.path.join(stage.bookdir, 'checked')
2901  if not larbatch_posix.exists(checked_file):
2902  raise RuntimeError('Wait for any running jobs to finish and run project.py --check')
2903  makeup_count = 0
2904 
2905  # First delete bad worker subdirectories.
2906 
2907  bad_filename = os.path.join(stage.bookdir, 'bad.list')
2908  if larbatch_posix.exists(bad_filename):
2909  lines = larbatch_posix.readlines(bad_filename)
2910  for line in lines:
2911  bad_subdir = line.strip()
2912  if bad_subdir != '':
2913  bad_path = os.path.join(stage.outdir, bad_subdir)
2914  if larbatch_posix.exists(bad_path):
2915  print('Deleting %s' % bad_path)
2916  larbatch_posix.rmtree(bad_path)
2917  bad_path = os.path.join(stage.logdir, bad_subdir)
2918  if larbatch_posix.exists(bad_path):
2919  print('Deleting %s' % bad_path)
2920  larbatch_posix.rmtree(bad_path)
2921  bad_path = os.path.join(stage.bookdir, bad_subdir)
2922  if larbatch_posix.exists(bad_path):
2923  print('Deleting %s' % bad_path)
2924  larbatch_posix.rmtree(bad_path)
2925 
2926  # Get a list of missing files, if any, for file list input.
2927  # Regenerate the input file list in the work directory, and
2928  # set the makeup job count.
2929 
2930  missing_files = []
2931  if stage.inputdef == '':
2932  missing_filename = os.path.join(stage.bookdir, 'missing_files.list')
2933  if larbatch_posix.exists(missing_filename):
2934  lines = larbatch_posix.readlines(missing_filename)
2935  for line in lines:
2936  words = line.split()
2937  if len(words) > 0:
2938  missing_files.append(words[0])
2939  makeup_count = len(missing_files)
2940  print('Makeup list contains %d files.' % makeup_count)
2941 
2942  if input_list_name != '':
2943  work_list_name = os.path.join(tmpworkdir, input_list_name)
2944  if larbatch_posix.exists(work_list_name):
2945  larbatch_posix.remove(work_list_name)
2946  work_list = safeopen(work_list_name)
2947  for missing_file in missing_files:
2948  work_list.write('%s\n' % missing_file)
2949  work_list.close()
2950 
2951  # In case of making up generation jobs, produce a procmap file
2952  # for missing jobs that will ensure that made up generation
2953  # jobs get a unique subrun.
2954 
2955  if stage.inputdef == '' and stage.inputfile == '' and stage.inputlist == '':
2956  procs = set(range(stage.num_jobs))
2957 
2958  # Loop over good output files to extract existing
2959  # process numbers and determine missing process numbers.
2960 
2961  output_files = os.path.join(stage.bookdir, 'files.list')
2962  if larbatch_posix.exists(output_files):
2963  lines = larbatch_posix.readlines(output_files)
2964  for line in lines:
2965  dir = os.path.basename(os.path.dirname(line))
2966  dir_parts = dir.split('_')
2967  if len(dir_parts) > 1:
2968  proc = int(dir_parts[1])
2969  if proc in procs:
2970  procs.remove(proc)
2971  if len(procs) != makeup_count:
2972  raise RuntimeError('Makeup process list has different length than makeup count.')
2973 
2974  # Generate process map.
2975 
2976  if len(procs) > 0:
2977  procmap = 'procmap.txt'
2978  procmap_path = os.path.join(tmpworkdir, procmap)
2979  procmap_file = safeopen(procmap_path)
2980  for proc in procs:
2981  procmap_file.write('%d\n' % proc)
2982  procmap_file.close()
2983 
2984  # Prepare sam-related makeup information.
2985 
2986  import_samweb()
2987 
2988  # Get list of successful consumer process ids.
2989 
2990  cpids = []
2991  cpids_filename = os.path.join(stage.bookdir, 'cpids.list')
2992  if larbatch_posix.exists(cpids_filename):
2993  cpids_files = larbatch_posix.readlines(cpids_filename)
2994  for line in cpids_files:
2995  cpids.append(line.strip())
2996 
2997  # Create makeup dataset definition.
2998 
2999  makeup_defname = ''
3000  if len(cpids) > 0:
3001  project_utilities.test_kca()
3002  makeup_defname = samweb.makeProjectName(stage.inputdef) + '_makeup'
3003 
3004  # Construct comma-separated list of consumer process ids.
3005 
3006  cpids_list = ''
3007  sep = ''
3008  for cpid in cpids:
3009  cpids_list = cpids_list + '%s%s' % (sep, cpid)
3010  sep = ','
3011 
3012  # Construct makeup dimension.
3013 
3014  dim = '(defname: %s) minus (consumer_process_id %s and consumed_status consumed)' % (stage.inputdef, cpids_list)
3015 
3016  # Create makeup dataset definition.
3017 
3018  print('Creating makeup sam dataset definition %s' % makeup_defname)
3019  project_utilities.test_kca()
3020  samweb.createDefinition(defname=makeup_defname, dims=dim)
3021  makeup_count = samweb.countFiles(defname=makeup_defname)
3022  print('Makeup dataset contains %d files.' % makeup_count)
3023 
3024  # Make a tarball out of all of the files in tmpworkdir in stage.workdir
3025 
3026  tmptar = '%s/work.tar' % tmpworkdir
3027  jobinfo = subprocess.Popen(['tar','-cf', tmptar, '-C', tmpworkdir,
3028  '--mtime=2018-01-01',
3029  '--exclude=work.tar', '.'],
3030  stdout=subprocess.PIPE,
3031  stderr=subprocess.PIPE)
3032  jobout, joberr = jobinfo.communicate()
3033  rc = jobinfo.poll()
3034  if rc != 0:
3035  raise RuntimeError('Failed to create work tarball in %s' % tmpworkdir)
3036 
3037  # Calculate the checksum of the tarball.
3038 
3039  hasher = hashlib.md5()
3040  f = open(tmptar, 'rb')
3041  buf = f.read(1024)
3042  while len(buf) > 0:
3043  hasher.update(buf)
3044  buf = f.read(1024)
3045  hash = hasher.hexdigest()
3046  f.close()
3047 
3048  # Transfer tarball to work directory.
3049  # Give the tarball a unique name based on its checksum.
3050  # Don't replace the tarball if it already exists.
3051 
3052  hashtar = '%s/work%s.tar' % (stage.workdir, hash)
3053  if not larbatch_posix.exists(hashtar):
3054  larbatch_posix.copy(tmptar, hashtar)
3055  jobsub_workdir_files_args.extend(['-f', hashtar])
3056 
3057  # Sam stuff.
3058 
3059  # Get input sam dataset definition name.
3060  # Can be from xml or a makeup dataset that we just created.
3061 
3062  inputdef = stage.inputdef
3063  if makeup and makeup_defname != '':
3064  inputdef = makeup_defname
3065 
3066  # Sam project name.
3067 
3068  prjname = ''
3069  if inputdef != '':
3070  import_samweb()
3071  project_utilities.test_kca()
3072  prjname = samweb.makeProjectName(inputdef)
3073 
3074  # Get mix input sam dataset definition name.
3075 
3076  mixprjname = ''
3077  if stage.mixinputdef != '':
3078  import_samweb()
3079  project_utilities.test_kca()
3080  mixprjname = 'mix_%s' % samweb.makeProjectName(stage.mixinputdef)
3081 
3082  # If the prestart flag is specified, start the sam project now.
3083 
3084  prj_started = False
3085  if prjname != '' and stage.prestart != 0:
3086  ok = project_utilities.start_project(inputdef, prjname,
3087  stage.num_jobs * stage.max_files_per_job,
3088  stage.recur, stage.filelistdef)
3089  if ok != 0:
3090  print('Failed to start project.')
3091  sys.exit(1)
3092  prj_started = True
3093 
3094  # Also start mix project, if any.
3095 
3096  if mixprjname != '' and prj_started:
3097  ok = project_utilities.start_project(stage.mixinputdef, mixprjname, 0, 0, stage.filelistdef)
3098  if ok != 0:
3099  print('Failed to start mix project.')
3100  sys.exit(1)
3101 
3102  # Get role
3103 
3104  role = project_utilities.get_role()
3105  if project.role != '':
3106  role = project.role
3107 
3108  # Construct jobsub command line for workers.
3109 
3110  command = ['jobsub_submit']
3111  command_njobs = 1
3112 
3113  # Jobsub options.
3114 
3115  command.append('--group=%s' % project_utilities.get_experiment())
3116  command.append('--role=%s' % role)
3117  command.extend(jobsub_workdir_files_args)
3118  if project.server != '-' and project.server != '':
3119  command.append('--jobsub-server=%s' % project.server)
3120  if stage.resource != '':
3121  command.append('--resource-provides=usage_model=%s' % stage.resource)
3122  elif project.resource != '':
3123  command.append('--resource-provides=usage_model=%s' % project.resource)
3124  if stage.lines != '':
3125  command.append('--lines=%s' % stage.lines)
3126  elif project.lines != '':
3127  command.append('--lines=%s' % project.lines)
3128  if stage.site != '':
3129  command.append('--site=%s' % stage.site)
3130  if stage.blacklist != '':
3131  command.append('--blacklist=%s' % stage.blacklist)
3132  if stage.cpu != 0:
3133  command.append('--cpu=%d' % stage.cpu)
3134  if stage.disk != '':
3135  command.append('--disk=%s' % stage.disk)
3136  if stage.memory != 0:
3137  command.append('--memory=%d' % stage.memory)
3138  if project.os != '':
3139  if stage.singularity == 0:
3140  command.append('--OS=%s' % project.os)
3141  else:
3142  p = project_utilities.get_singularity(project.os)
3143  if p != '':
3144  if (stage.num_jobs > 1 or project.force_dag) and \
3145  (inputdef != '' or stage.mixinputdef != '') :
3146  command.append(r"""--lines='+SingularityImage=\"%s\"'""" % p)
3147  else:
3148  command.append(r"""--lines='+SingularityImage="%s"'""" % p)
3149  else:
3150  raise RuntimeError('No singularity image found for %s' % project.os)
3151  if not stage.pubs_output:
3152  if not makeup:
3153  command_njobs = stage.num_jobs
3154  command.extend(['-N', '%d' % command_njobs])
3155  else:
3156  command_njobs = min(makeup_count, stage.num_jobs)
3157  command.extend(['-N', '%d' % command_njobs])
3158  else:
3159  if stage.inputdef != '':
3160  command_njobs = stage.num_jobs
3161  else:
3162  command_njobs = stage.num_jobs
3163  command.extend(['-N', '%d' % command_njobs])
3164  if stage.jobsub != '':
3165  for word in stage.jobsub.split():
3166  command.append(word)
3167  opt = project_utilities.default_jobsub_submit_options()
3168  if opt != '':
3169  for word in opt.split():
3170  command.append(word)
3171  if stage.cvmfs != 0:
3172  command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3173  if stage.stash != 0:
3174  command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3175  if stage.singularity != 0:
3176  command.append('--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3177 
3178  # Batch script.
3179 
3180  workurl = "file://%s" % workscript
3181  command.append(workurl)
3182 
3183  # check if there is a request for max num of files per job
3184  # and add that if to the condor_lar.sh line
3185 
3186  if stage.max_files_per_job != 0:
3187  command_max_files_per_job = stage.max_files_per_job
3188  command.extend(['--nfile', '%d' % command_max_files_per_job])
3189  #print 'Setting the max files to %d' % command_max_files_per_job
3190 
3191  # Larsoft options.
3192 
3193  command.extend([' --group', project_utilities.get_experiment()])
3194  command.extend([' -g'])
3195  command.extend([' -c', 'wrapper.fcl'])
3196  command.extend([' --ups', ','.join(project.ups)])
3197  if project.release_tag != '':
3198  command.extend([' -r', project.release_tag])
3199  command.extend([' -b', project.release_qual])
3200  if project.local_release_dir != '':
3201  command.extend([' --localdir', project.local_release_dir])
3202  if project.local_release_tar != '':
3203  command.extend([' --localtar', project.local_release_tar])
3204  command.extend([' --workdir', stage.workdir])
3205  command.extend([' --outdir', stage.outdir])
3206  command.extend([' --logdir', stage.logdir])
3207  if stage.dirsize > 0:
3208  command.extend([' --dirsize', '%d' % stage.dirsize])
3209  if stage.dirlevels > 0:
3210  command.extend([' --dirlevels', '%d' % stage.dirlevels])
3211  if stage.exe:
3212  if type(stage.exe) == type([]):
3213  command.extend([' --exe', ':'.join(stage.exe)])
3214  else:
3215  command.extend([' --exe', stage.exe])
3216  if stage.schema != '':
3217  command.extend([' --sam_schema', stage.schema])
3218  if project.os != '':
3219  command.extend([' --os', project.os])
3220 
3221  # Set the process number for pubs jobs that are the first in the chain.
3222 
3223  if not stage.pubs_input and stage.pubs_output and stage.output_subruns[0] > 0:
3224  command.extend(['--process', '%d' % (stage.output_subruns[0]-1)])
3225 
3226  # Specify single worker mode in case of pubs output.
3227 
3228  if stage.dynamic:
3229  command.append('--single')
3230 
3231  if stage.inputfile != '':
3232  command.extend([' -s', stage.inputfile])
3233  elif input_list_name != '':
3234  command.extend([' -S', input_list_name])
3235  elif inputdef != '':
3236  command.extend([' --sam_defname', inputdef,
3237  ' --sam_project', prjname])
3238  if recur:
3239  command.extend([' --recur'])
3240  if stage.mixinputdef != '':
3241  command.extend([' --mix_defname', stage.mixinputdef,
3242  ' --mix_project', mixprjname])
3243  if stage.inputmode != '':
3244  command.extend([' --inputmode', stage.inputmode])
3245  command.extend([' -n', '%d' % stage.num_events])
3246  if stage.inputdef == '':
3247  command.extend([' --njobs', '%d' % stage.num_jobs ])
3248  for ftype in stage.datafiletypes:
3249  command.extend(['--data_file_type', ftype])
3250  if procmap != '':
3251  command.extend([' --procmap', procmap])
3252  if stage.output:
3253  if type(stage.output) == type([]):
3254  command.extend([' --output', ':'.join(stage.output)])
3255  else:
3256  command.extend([' --output', stage.output])
3257  if stage.TFileName != '':
3258  command.extend([' --TFileName', stage.TFileName])
3259  if stage.init_script != '':
3260  command.extend([' --init-script', os.path.basename(stage.init_script)])
3261  if stage.init_source != '':
3262  command.extend([' --init-source', os.path.basename(stage.init_source)])
3263  if stage.end_script != '':
3264  command.extend([' --end-script', os.path.basename(stage.end_script)])
3265  if stage.mid_source != '':
3266  command.extend([' --mid-source', os.path.basename(stage.mid_source)])
3267  if stage.mid_script != '':
3268  command.extend([' --mid-script', os.path.basename(stage.mid_script)])
3269  if abssetupscript != '':
3270  command.extend([' --init', abssetupscript])
3271 
3272 
3273  #print 'Will Validation will be done on the worker node %d' % stage.validate_on_worker
3274  if stage.validate_on_worker == 1:
3275  print('Validation will be done on the worker node %d' % stage.validate_on_worker)
3276  command.extend([' --validate'])
3277  command.extend([' --declare'])
3278  # Maintain parentage only if we have multiple fcl files and thus are running in multiple stages
3279  if type(stage.fclname) == type([]) and len(stage.fclname) > 1:
3280  command.extend([' --maintain_parentage'])
3281 
3282  if stage.copy_to_fts == 1:
3283  command.extend([' --copy'])
3284 
3285  # If input is from sam, also construct a dag file, or add --sam_start option.
3286 
3287  if (prjname != '' or mixprjname != '') and command_njobs == 1 and not project.force_dag and not prj_started:
3288  command.extend([' --sam_start',
3289  ' --sam_station', project_utilities.get_experiment(),
3290  ' --sam_group', project_utilities.get_experiment()])
3291 
3292 
3293  # At this point, the main batch worker command is complete.
3294  # Decide whether to submit this command stand alone or as part of a dag.
3295 
3296  start_commands = []
3297  stop_commands = []
3298  dag_prjs = []
3299  if command_njobs > 1 or project.force_dag:
3300  if inputdef != '':
3301  dag_prjs.append([inputdef, prjname])
3302  if stage.mixinputdef != '':
3303  dag_prjs.append([stage.mixinputdef, mixprjname])
3304 
3305  for dag_prj in dag_prjs:
3306 
3307  # At this point, it is an error if the start and stop project
3308  # scripts were not found.
3309 
3310  if workstartname == '' or workstopname == '':
3311  raise RuntimeError('Sam start or stop project script not found.')
3312 
3313  # Start project jobsub command.
3314 
3315  start_command = ['jobsub']
3316 
3317  # General options.
3318 
3319  start_command.append('--group=%s' % project_utilities.get_experiment())
3320  if setupscript != '':
3321  start_command.append('-f %s' % setupscript)
3322  #start_command.append('--role=%s' % role)
3323  if stage.resource != '':
3324  start_command.append('--resource-provides=usage_model=%s' % stage.resource)
3325  elif project.resource != '':
3326  start_command.append('--resource-provides=usage_model=%s' % project.resource)
3327  if stage.lines != '':
3328  start_command.append('--lines=%s' % stage.lines)
3329  elif project.lines != '':
3330  start_command.append('--lines=%s' % project.lines)
3331  if stage.site != '':
3332  start_command.append('--site=%s' % stage.site)
3333  if stage.blacklist != '':
3334  start_command.append('--blacklist=%s' % stage.blacklist)
3335  if project.os != '':
3336  if stage.singularity == 0:
3337  start_command.append('--OS=%s' % project.os)
3338  else:
3339  p = project_utilities.get_singularity(project.os)
3340  if p != '':
3341  start_command.append('--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3342  else:
3343  raise RuntimeError('No singularity image found for %s' % project.os)
3344  if stage.jobsub_start != '':
3345  for word in stage.jobsub_start.split():
3346  start_command.append(word)
3347  opt = project_utilities.default_jobsub_submit_options()
3348  if opt != '':
3349  for word in opt.split():
3350  start_command.append(word)
3351  if stage.cvmfs != 0:
3352  start_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3353  if stage.stash != 0:
3354  start_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3355  if stage.singularity != 0:
3356  start_command.append('--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3357 
3358  # Start project script.
3359 
3360  workstarturl = "file://%s" % workstartscript
3361  start_command.append(workstarturl)
3362 
3363  # Sam options.
3364 
3365  start_command.extend([' --sam_station', project_utilities.get_experiment(),
3366  ' --sam_group', project_utilities.get_experiment(),
3367  ' --sam_defname', dag_prj[0],
3368  ' --sam_project', dag_prj[1],
3369  ' -g'])
3370  if recur:
3371  start_command.extend([' --recur'])
3372 
3373  if abssetupscript != '':
3374  start_command.extend([' --init', abssetupscript])
3375 
3376  if stage.num_jobs > 0 and stage.max_files_per_job > 0:
3377  start_command.extend([' --max_files', '%d' % (stage.num_jobs * stage.max_files_per_job)])
3378 
3379  if stage.prestagefraction > 0.:
3380  start_command.extend([' --prestage_fraction', '%f' % stage.prestagefraction])
3381 
3382  # Output directory.
3383 
3384  start_command.extend([' --logdir', stage.logdir])
3385 
3386  # Done with start command.
3387 
3388  if not prj_started or stage.prestagefraction > 0.:
3389  start_commands.append(start_command)
3390 
3391  # Stop project jobsub command.
3392 
3393  stop_command = ['jobsub']
3394 
3395  # General options.
3396 
3397  stop_command.append('--group=%s' % project_utilities.get_experiment())
3398  if setupscript != '':
3399  stop_command.append('-f %s' % setupscript)
3400  #stop_command.append('--role=%s' % role)
3401  if stage.resource != '':
3402  stop_command.append('--resource-provides=usage_model=%s' % stage.resource)
3403  elif project.resource != '':
3404  stop_command.append('--resource-provides=usage_model=%s' % project.resource)
3405  if stage.lines != '':
3406  stop_command.append('--lines=%s' % stage.lines)
3407  elif project.lines != '':
3408  stop_command.append('--lines=%s' % project.lines)
3409  if stage.site != '':
3410  stop_command.append('--site=%s' % stage.site)
3411  if stage.blacklist != '':
3412  stop_command.append('--blacklist=%s' % stage.blacklist)
3413  if project.os != '':
3414  if stage.singularity == 0:
3415  stop_command.append('--OS=%s' % project.os)
3416  else:
3417  p = project_utilities.get_singularity(project.os)
3418  if p != '':
3419  stop_command.append('--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3420  else:
3421  raise RuntimeError('No singularity image found for %s' % project.os)
3422  if stage.jobsub_start != '':
3423  for word in stage.jobsub_start.split():
3424  stop_command.append(word)
3425  opt = project_utilities.default_jobsub_submit_options()
3426  if opt != '':
3427  for word in opt.split():
3428  stop_command.append(word)
3429  if stage.cvmfs != 0:
3430  stop_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3431  if stage.stash != 0:
3432  stop_command.append('--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3433  if stage.singularity != 0:
3434  stop_command.append('--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3435 
3436  # Stop project script.
3437 
3438  workstopurl = "file://%s" % workstopscript
3439  stop_command.append(workstopurl)
3440 
3441  # Sam options.
3442 
3443  stop_command.extend([' --sam_station', project_utilities.get_experiment(),
3444  ' --sam_project', dag_prj[1],
3445  ' -g'])
3446 
3447  # Output directory.
3448 
3449  stop_command.extend([' --logdir', stage.logdir])
3450 
3451  if abssetupscript != '':
3452  stop_command.extend([' --init', abssetupscript])
3453 
3454  # Done with stop command.
3455 
3456  stop_commands.append(stop_command)
3457 
3458  if len(start_commands) > 0 or len(stop_commands) > 0:
3459 
3460  # Create dagNabbit.py configuration script in the work directory.
3461 
3462  dagfilepath = os.path.join(tmpdir, 'submit.dag')
3463  dag = safeopen(dagfilepath)
3464  dag.write('<serial>\n')
3465 
3466  # Write start section.
3467 
3468  if len(start_commands) > 0:
3469  dag.write('\n<parallel>\n\n')
3470  for start_command in start_commands:
3471  first = True
3472  for word in start_command:
3473  if not first:
3474  dag.write(' ')
3475  dag.write(word)
3476  if word[:6] == 'jobsub':
3477  dag.write(' -n')
3478  first = False
3479  dag.write('\n\n')
3480  dag.write('</parallel>\n')
3481 
3482  # Write main section.
3483 
3484  dag.write('\n<parallel>\n\n')
3485  for process in range(command_njobs):
3486  #for process in range(1):
3487  first = True
3488  skip = False
3489  for word in command:
3490  if skip:
3491  skip = False
3492  else:
3493  if word == '-N':
3494  #if False:
3495  skip = True
3496  else:
3497  if not first:
3498  dag.write(' ')
3499  if word[:6] == 'jobsub':
3500  word = 'jobsub'
3501  if word[:7] == '--role=':
3502  word = ''
3503  if word.startswith('--jobsub-server='):
3504  word = ''
3505  word = project_utilities.dollar_escape(word)
3506  dag.write(word)
3507  if word[:6] == 'jobsub':
3508  dag.write(' -n')
3509  first = False
3510  dag.write(' --process %d\n' % process)
3511  dag.write('\n')
3512  dag.write('\n</parallel>\n')
3513 
3514  # Write stop section.
3515 
3516  if len(stop_commands) > 0:
3517  dag.write('\n<parallel>\n\n')
3518  for stop_command in stop_commands:
3519  first = True
3520  for word in stop_command:
3521  if not first:
3522  dag.write(' ')
3523  dag.write(word)
3524  if word[:6] == 'jobsub':
3525  dag.write(' -n')
3526  first = False
3527  dag.write('\n\n')
3528  dag.write('</parallel>\n')
3529 
3530  # Finish dag.
3531 
3532  dag.write('\n</serial>\n')
3533  dag.close()
3534 
3535  # Update the main submission command to use jobsub_submit_dag instead of jobsub_submit.
3536 
3537  command = ['jobsub_submit_dag']
3538  command.append('--group=%s' % project_utilities.get_experiment())
3539  if project.server != '-' and project.server != '':
3540  command.append('--jobsub-server=%s' % project.server)
3541  command.append('--role=%s' % role)
3542  dagfileurl = 'file://'+ dagfilepath
3543  command.append(dagfileurl)
3544 
3545  checked_file = os.path.join(stage.bookdir, 'checked')
3546 
3547  # Calculate submit timeout.
3548 
3549  submit_timeout = 3600000
3550  if prjname != '':
3551  submit_timeout += 1.0 * command_njobs
3552  if stage.jobsub_timeout > submit_timeout:
3553  submit_timeout = stage.jobsub_timeout
3554 
3555  # Submit jobs.
3556 
3557  if not makeup:
3558 
3559  # For submit action, invoke the job submission command.
3560 
3561  print('Invoke jobsub_submit')
3562  if dryrun:
3563  print(' '.join(command))
3564  else:
3565  q = queue.Queue()
3566  jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3567  thread = threading.Thread(target=project_utilities.wait_for_subprocess, args=[jobinfo, q])
3568  thread.start()
3569  thread.join(timeout=submit_timeout)
3570  if thread.is_alive():
3571  jobinfo.terminate()
3572  thread.join()
3573  rc = q.get()
3574  jobout = convert_str(q.get())
3575  joberr = convert_str(q.get())
3576  if larbatch_posix.exists(checked_file):
3577  larbatch_posix.remove(checked_file)
3578  if larbatch_posix.isdir(tmpdir):
3579  larbatch_posix.rmtree(tmpdir)
3580  if larbatch_posix.isdir(tmpworkdir):
3581  larbatch_posix.rmtree(tmpworkdir)
3582  if rc != 0:
3583  raise JobsubError(command, rc, jobout, joberr)
3584  for line in jobout.split('\n'):
3585  if "JobsubJobId" in line:
3586  jobid = line.strip().split()[-1]
3587  if not jobid:
3588  raise JobsubError(command, rc, jobout, joberr)
3589  print('jobsub_submit finished.')
3590 
3591  else:
3592 
3593  # For makeup action, abort if makeup job count is zero for some reason.
3594 
3595  if makeup_count > 0:
3596  if dryrun:
3597  print(' '.join(command))
3598  else:
3599  q = queue.Queue()
3600  jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3601  thread = threading.Thread(target=project_utilities.wait_for_subprocess,
3602  args=[jobinfo, q])
3603  thread.start()
3604  thread.join(timeout=submit_timeout)
3605  if thread.is_alive():
3606  jobinfo.terminate()
3607  thread.join()
3608  rc = q.get()
3609  jobout = convert_str(q.get())
3610  joberr = convert_str(q.get())
3611  if larbatch_posix.exists(checked_file):
3612  larbatch_posix.remove(checked_file)
3613  if larbatch_posix.isdir(tmpdir):
3614  larbatch_posix.rmtree(tmpdir)
3615  if larbatch_posix.isdir(tmpworkdir):
3616  larbatch_posix.rmtree(tmpworkdir)
3617  if rc != 0:
3618  raise JobsubError(command, rc, jobout, joberr)
3619  for line in jobout.split('\n'):
3620  if "JobsubJobId" in line:
3621  jobid = line.strip().split()[-1]
3622  if not jobid:
3623  raise JobsubError(command, rc, jobout, joberr)
3624  else:
3625  print('Makeup action aborted because makeup job count is zero.')
3626 
3627  # Done.
3628 
3629  return jobid
3630 
3631 
3632 # Submit/makeup action.
3633 
3634 def dosubmit(project, stage, makeup=False, recur=False, dryrun=False):
3635 
3636  # Make sure we have a kerberos ticket.
3637 
3638  project_utilities.test_kca()
3639 
3640  # Make sure jobsub_client is available.
3641 
3642  larbatch_utilities.test_jobsub()
3643 
3644  # Run presubmission check script.
3645 
3646  ok = stage.checksubmit()
3647  if ok != 0:
3648  print('No jobs submitted.')
3649  return
3650 
3651  # In pubs mode, delete any existing work, log, or output
3652  # directories, since there is no separate makeup action for pubs
3653  # mode.
3654 
3655  if stage.pubs_output and not stage.dynamic:
3656  if larbatch_posix.exists(stage.workdir):
3657  larbatch_posix.rmtree(stage.workdir)
3658  if larbatch_posix.exists(stage.outdir):
3659  larbatch_posix.rmtree(stage.outdir)
3660  if larbatch_posix.exists(stage.logdir):
3661  larbatch_posix.rmtree(stage.logdir)
3662  if larbatch_posix.exists(stage.bookdir):
3663  larbatch_posix.rmtree(stage.bookdir)
3664 
3665  # Make or check directories.
3666 
3667  if not makeup:
3668  stage.makedirs()
3669  else:
3670  stage.checkdirs()
3671 
3672  # Check input files.
3673 
3674  ok = stage.checkinput(checkdef=True)
3675  if ok != 0:
3676  print('No jobs submitted.')
3677  return
3678 
3679  # Make sure output and log directories are empty (submit only).
3680 
3681  if not makeup and not recur and not stage.dynamic:
3682  if len(larbatch_posix.listdir(stage.outdir)) != 0:
3683  raise RuntimeError('Output directory %s is not empty.' % stage.outdir)
3684  if len(larbatch_posix.listdir(stage.logdir)) != 0:
3685  raise RuntimeError('Log directory %s is not empty.' % stage.logdir)
3686  if len(larbatch_posix.listdir(stage.bookdir)) != 0:
3687  raise RuntimeError('Log directory %s is not empty.' % stage.bookdir)
3688 
3689  # Copy files to workdir and issue jobsub command to submit jobs.
3690 
3691  jobid = dojobsub(project, stage, makeup, recur, dryrun)
3692 
3693  # Append jobid to file "jobids.list" in the log directory.
3694 
3695  jobids_filename = os.path.join(stage.bookdir, 'jobids.list')
3696  jobids = []
3697  if larbatch_posix.exists(jobids_filename):
3698  lines = larbatch_posix.readlines(jobids_filename)
3699  for line in lines:
3700  id = line.strip()
3701  if len(id) > 0:
3702  jobids.append(id)
3703  if len(jobid) > 0:
3704  jobids.append(jobid)
3705 
3706  jobid_file = safeopen(jobids_filename)
3707  for jobid in jobids:
3708  jobid_file.write('%s\n' % jobid)
3709  jobid_file.close()
3710 
3711  # Done.
3712 
3713  return jobid
3714 
3715 # Merge histogram files.
3716 # If mergehist is True, merge histograms using "hadd -T".
3717 # If mergentuple is True, do full merge using "hadd".
3718 # If neither argument is True, do custom merge using merge program specified
3719 # in xml stage.
3720 
3721 def domerge(stage, mergehist, mergentuple):
3722 
3723  hlist = []
3724  hnlist = os.path.join(stage.bookdir, 'filesana.list')
3725  if larbatch_posix.exists(hnlist):
3726  hlist = larbatch_posix.readlines(hnlist)
3727  else:
3728  raise RuntimeError('No filesana.list file found %s, run project.py --checkana' % hnlist)
3729 
3730  histurlsname_temp = 'histurls.list'
3731  histurls = safeopen(histurlsname_temp)
3732 
3733  for hist in hlist:
3734  histurls.write('%s\n' % hist)
3735  histurls.close()
3736 
3737  if len(hlist) > 0:
3738  name = os.path.join(stage.outdir, 'anahist.root')
3739  if name[0:6] == '/pnfs/':
3740  tempdir = '%s/mergentuple_%d_%d' % (project_utilities.get_scratch_dir(),
3741  os.getuid(),
3742  os.getpid())
3743  if not larbatch_posix.isdir(tempdir):
3744  larbatch_posix.makedirs(tempdir)
3745  name_temp = '%s/anahist.root' % tempdir
3746  else:
3747  name_temp = name
3748 
3749  if mergehist:
3750  mergecom = "hadd -T"
3751  elif mergentuple:
3752  mergecom = "hadd"
3753  else:
3754  mergecom = stage.merge
3755 
3756  print("Merging %d root files using %s." % (len(hlist), mergecom))
3757 
3758  if larbatch_posix.exists(name_temp):
3759  larbatch_posix.remove(name_temp)
3760  comlist = mergecom.split()
3761  comlist.extend(["-f", "-k", name_temp, '@' + histurlsname_temp])
3762  rc = subprocess.call(comlist, stdout=sys.stdout, stderr=sys.stderr)
3763  if rc != 0:
3764  print("%s exit status %d" % (mergecom, rc))
3765  if name != name_temp:
3766  if larbatch_posix.exists(name):
3767  larbatch_posix.remove(name)
3768  if larbatch_posix.exists(name_temp):
3769 
3770  # Copy merged file.
3771  larbatch_posix.copy(name_temp, name)
3772  larbatch_posix.rmtree(tempdir)
3773  larbatch_posix.remove(histurlsname_temp)
3774 
3775 
3776 # Sam audit.
3777 
3778 def doaudit(stage):
3779 
3780  import_samweb()
3781  stage_has_input = stage.inputfile != '' or stage.inputlist != '' or stage.inputdef != ''
3782  if not stage_has_input:
3783  raise RuntimeError('No auditing for generator stage.')
3784 
3785  # Are there other ways to get output files other than through definition!?
3786 
3787  outputlist = []
3788  outparentlist = []
3789  if stage.defname != '':
3790  query = 'isparentof: (defname: %s) and availability: anylocation' %(stage.defname)
3791  try:
3792  outparentlist = samweb.listFiles(dimensions=query)
3793  outputlist = samweb.listFiles(defname=stage.defname)
3794  except:
3795  raise RuntimeError('Error accessing sam information for definition %s.\nDoes definition exist?' % stage.defname)
3796  else:
3797  raise RuntimeError('Output definition not found.')
3798 
3799  # To get input files one can use definition or get inputlist given to that stage or
3800  # get input files for a given stage as get_input_files(stage)
3801 
3802  inputlist = []
3803  if stage.inputdef != '':
3804  import_samweb()
3805  inputlist=samweb.listFiles(defname=stage.inputdef)
3806  elif stage.inputlist != '':
3807  ilist = []
3808  if larbatch_posix.exists(stage.inputlist):
3809  ilist = larbatch_posix.readlines(stage.inputlist)
3810  inputlist = []
3811  for i in ilist:
3812  inputlist.append(os.path.basename(i.strip()))
3813  else:
3814  raise RuntimeError('Input definition and/or input list does not exist.')
3815 
3816  difflist = set(inputlist)^set(outparentlist)
3817  mc = 0;
3818  me = 0;
3819  for item in difflist:
3820  if item in inputlist:
3821  mc = mc+1
3822  if mc==1:
3823  missingfilelistname = os.path.join(stage.bookdir, 'missingfiles.list')
3824  missingfilelist = safeopen(missingfilelistname)
3825  if mc>=1:
3826  missingfilelist.write("%s\n" %item)
3827  elif item in outparentlist:
3828  me = me+1
3829  childcmd = 'samweb list-files "ischildof: (file_name=%s) and availability: physical"' %(item)
3830  children = convert_str(subprocess.check_output(childcmd, shell=True)).splitlines()
3831  rmfile = list(set(children) & set(outputlist))[0]
3832  if me ==1:
3833  flist = []
3834  fnlist = os.path.join(stage.bookdir, 'files.list')
3835  if larbatch_posix.exists(fnlist):
3836  flist = larbatch_posix.readlines(fnlist)
3837  slist = []
3838  for line in flist:
3839  slist.append(line.split()[0])
3840  else:
3841  raise RuntimeError('No files.list file found %s, run project.py --check' % fnlist)
3842 
3843  # Declare the content status of the file as bad in SAM.
3844 
3845  sdict = {'content_status':'bad'}
3846  project_utilities.test_kca()
3847  samweb.modifyFileMetadata(rmfile, sdict)
3848  print('\nDeclaring the status of the following file as bad:', rmfile)
3849 
3850  # Remove this file from the files.list in the output directory.
3851 
3852  fn = []
3853  fn = [x for x in slist if os.path.basename(x.strip()) != rmfile]
3854  thefile = safeopen(fnlist)
3855  for item in fn:
3856  thefile.write("%s\n" % item)
3857 
3858  if mc==0 and me==0:
3859  print("Everything in order.")
3860  return 0
3861  else:
3862  print('Missing parent file(s) = ', mc)
3863  print('Extra parent file(s) = ',me)
3864 
3865  if mc != 0:
3866  missingfilelist.close()
3867  print("Creating missingfiles.list in the output directory....done!")
3868  if me != 0:
3869  thefile.close()
3870  #larbatch_posix.remove("jsonfile.json")
3871  print("For extra parent files, files.list redefined and content status declared as bad in SAM...done!")
3872 
3873 
3874 # Print help.
3875 
3876 def help():
3877 
3878  filename = sys.argv[0]
3879  file = open(filename, 'r')
3880 
3881  doprint=0
3882 
3883  for line in file.readlines():
3884  if line[2:12] == 'project.py':
3885  doprint = 1
3886  elif line[0:6] == '######' and doprint:
3887  doprint = 0
3888  if doprint:
3889  if len(line) > 2:
3890  print(line[2:], end=' ')
3891  else:
3892  print()
3893 
3894 # Normalize xml path.
3895 #
3896 # Don't modify xml file path for any of the following cases.
3897 #
3898 # 1. xmlfile contains character ':'. In this case xmlfile may be a url.
3899 # 2. xmlfile starts with '/', './' or '../'.
3900 # 3. xmlfile is '-'. Stands for standard input.
3901 #
3902 # Otherwise, assume that xmlfile is a relative path. In this case, convert it to
3903 # an absolute path relative to the current working directory, or directory contained
3904 # in environment variable XMLPATH (colon-separated list of directories).
3905 
3906 def normxmlpath(xmlfile):
3907 
3908  # Default result = input.
3909 
3910  normxmlfile = xmlfile
3911 
3912  # Does this look like a relative path?
3913 
3914  if xmlfile.find(':') < 0 and \
3915  not xmlfile.startswith('/') and \
3916  not xmlfile.startswith('./') and \
3917  not xmlfile.startswith('../') and \
3918  xmlfile != '-':
3919 
3920  # Yes, try to normalize path.
3921  # Construct a list of directories to search, starting with current working directory.
3922 
3923  dirs = [os.getcwd()]
3924 
3925  # Add directories in environment variable XMLPATH, if defined.
3926 
3927  if 'XMLPATH' in os.environ:
3928  dirs.extend(os.environ['XMLPATH'].split(':'))
3929 
3930  # Loop over directories.
3931 
3932  for dir in dirs:
3933  xmlpath = os.path.join(dir, xmlfile)
3934  if os.path.exists(xmlpath):
3935  normxmlfile = xmlpath
3936  break
3937 
3938  # Done.
3939 
3940  return normxmlfile
3941 
3942 # Print xml help.
3943 
3944 def xmlhelp():
3945 
3946  filename = sys.argv[0]
3947  file = open(filename, 'r')
3948 
3949  doprint=0
3950 
3951  for line in file.readlines():
3952  if line[2:20] == 'XML file structure':
3953  doprint = 1
3954  elif line[0:6] == '######' and doprint:
3955  doprint = 0
3956  if doprint:
3957  if len(line) > 2:
3958  print(line[2:], end=' ')
3959  else:
3960  print()
3961 
3962 
3963 # Main program.
3964 
3965 def main(argv):
3966 
3967  # Parse arguments.
3968 
3969  xmlfile = ''
3970  projectname = ''
3971  stagenames = ['']
3972  lines = ''
3973  site = ''
3974  cpu = 0
3975  disk = ''
3976  memory = 0
3977  inputdef = ''
3978  merge = 0
3979  submit = 0
3980  recur = 0
3981  pubs = 0
3982  pubs_run = 0
3983  pubs_subruns = []
3984  pubs_version = None
3985  check = 0
3986  checkana = 0
3987  shorten = 0
3988  fetchlog = 0
3989  mergehist = 0
3990  mergentuple = 0
3991  audit = 0
3992  stage_status = 0
3993  makeup = 0
3994  clean = 0
3995  clean_one = 0
3996  dump_project = 0
3997  dump_stage = 0
3998  dryrun = 0
3999  nocheck = 0
4000  print_outdir = 0
4001  print_logdir = 0
4002  print_workdir = 0
4003  print_bookdir = 0
4004  fcl = 0
4005  defname = 0
4006  do_input_files = 0
4007  do_check_submit = 0
4008  do_check_input = 0
4009  declare = 0
4010  declare_ana = 0
4011  define = 0
4012  define_ana = 0
4013  undefine = 0
4014  check_declarations = 0
4015  check_declarations_ana = 0
4016  test_declarations = 0
4017  test_declarations_ana = 0
4018  check_definition = 0
4019  check_definition_ana = 0
4020  test_definition = 0
4021  test_definition_ana = 0
4022  add_locations = 0
4023  add_locations_ana = 0
4024  check_locations = 0
4025  check_locations_ana = 0
4026  upload = 0
4027  upload_ana = 0
4028  check_tape = 0
4029  check_tape_ana = 0
4030  clean_locations = 0
4031  clean_locations_ana = 0
4032  remove_locations = 0
4033  remove_locations_ana = 0
4034 
4035  args = argv[1:]
4036  while len(args) > 0:
4037  if args[0] == '-h' or args[0] == '--help' :
4038  help()
4039  return 0
4040  elif args[0] == '-xh' or args[0] == '--xmlhelp' :
4041  xmlhelp()
4042  return 0
4043  elif args[0] == '--xml' and len(args) > 1:
4044  xmlfile = args[1]
4045  del args[0:2]
4046  elif args[0] == '--project' and len(args) > 1:
4047  projectname = args[1]
4048  del args[0:2]
4049  elif args[0] == '--stage' and len(args) > 1:
4050  stagenames = args[1].split(',')
4051  del args[0:2]
4052  elif args[0] == '--tmpdir' and len(args) > 1:
4053  os.environ['TMPDIR'] = args[1]
4054  del args[0:2]
4055  elif args[0] == '--lines' and len(args) > 1:
4056  lines = args[1]
4057  del args[0:2]
4058  elif args[0] == '--site' and len(args) > 1:
4059  site = args[1]
4060  del args[0:2]
4061  elif args[0] == '--cpu' and len(args) > 1:
4062  cpu = int(args[1])
4063  del args[0:2]
4064  elif args[0] == '--disk' and len(args) > 1:
4065  disk = args[1]
4066  del args[0:2]
4067  elif args[0] == '--memory' and len(args) > 1:
4068  memory = int(args[1])
4069  del args[0:2]
4070  elif args[0] == '--inputdef' and len(args) > 1:
4071  inputdef = args[1]
4072  del args[0:2]
4073  elif args[0] == '--submit':
4074  submit = 1
4075  del args[0]
4076  elif args[0] == '--recur':
4077  recur = 1
4078  del args[0]
4079  elif args[0] == '--pubs' and len(args) > 2:
4080  pubs = 1
4081  pubs_run = int(args[1])
4082  pubs_subruns = project_utilities.parseInt(args[2])
4083  del args[0:3]
4084  if len(args) > 0 and args[0] != '' and args[0][0] != '-':
4085  pubs_version = int(args[0])
4086  del args[0]
4087  elif args[0] == '--check':
4088  check = 1
4089  del args[0]
4090  elif args[0] == '--checkana':
4091  checkana = 1
4092  del args[0]
4093  elif args[0] == '--shorten':
4094  shorten = 1
4095  del args[0]
4096  elif args[0] == '--fetchlog':
4097  fetchlog = 1
4098  del args[0]
4099  elif args[0] == '--merge':
4100  merge = 1
4101  del args[0]
4102  elif args[0] == '--mergehist':
4103  mergehist = 1
4104  del args[0]
4105  elif args[0] == '--mergentuple':
4106  mergentuple = 1
4107  del args[0]
4108  elif args[0] == '--audit':
4109  audit = 1
4110  del args[0]
4111  elif args[0] == '--status':
4112  stage_status = 1
4113  del args[0]
4114  elif args[0] == '--makeup':
4115  makeup = 1
4116  del args[0]
4117  elif args[0] == '--clean':
4118  clean = 1
4119  del args[0]
4120  elif args[0] == '--clean_one':
4121  clean_one = 1
4122  del args[0]
4123  elif args[0] == '--dump_project':
4124  dump_project = 1
4125  del args[0]
4126  elif args[0] == '--dump_stage':
4127  dump_stage = 1
4128  del args[0]
4129  elif args[0] == '--dryrun':
4130  dryrun = 1
4131  del args[0]
4132  elif args[0] == '--nocheck':
4133  nocheck = 1
4134  del args[0]
4135  elif args[0] == '--outdir':
4136  print_outdir = 1
4137  del args[0]
4138  elif args[0] == '--logdir':
4139  print_logdir = 1
4140  del args[0]
4141  elif args[0] == '--workdir':
4142  print_workdir = 1
4143  del args[0]
4144  elif args[0] == '--bookdir':
4145  print_bookdir = 1
4146  del args[0]
4147  elif args[0] == '--fcl':
4148  fcl = 1
4149  del args[0]
4150  elif args[0] == '--defname':
4151  defname = 1
4152  del args[0]
4153  elif args[0] == '--input_files':
4154  do_input_files = 1
4155  del args[0]
4156  elif args[0] == '--check_submit':
4157  do_check_submit = 1
4158  del args[0]
4159  elif args[0] == '--check_input':
4160  do_check_input = 1
4161  del args[0]
4162  elif args[0] == '--declare':
4163  declare = 1
4164  del args[0]
4165  elif args[0] == '--declare_ana':
4166  declare_ana = 1
4167  del args[0]
4168  elif args[0] == '--define':
4169  define = 1
4170  del args[0]
4171  elif args[0] == '--define_ana':
4172  define_ana = 1
4173  del args[0]
4174  elif args[0] == '--undefine':
4175  undefine = 1
4176  del args[0]
4177  elif args[0] == '--check_declarations':
4178  check_declarations = 1
4179  del args[0]
4180  elif args[0] == '--check_declarations_ana':
4181  check_declarations_ana = 1
4182  del args[0]
4183  elif args[0] == '--test_declarations':
4184  test_declarations = 1
4185  del args[0]
4186  elif args[0] == '--test_declarations_ana':
4187  test_declarations_ana = 1
4188  del args[0]
4189  elif args[0] == '--check_definition':
4190  check_definition = 1
4191  del args[0]
4192  elif args[0] == '--check_definition_ana':
4193  check_definition_ana = 1
4194  del args[0]
4195  elif args[0] == '--test_definition':
4196  test_definition = 1
4197  del args[0]
4198  elif args[0] == '--test_definition_ana':
4199  test_definition_ana = 1
4200  del args[0]
4201  elif args[0] == '--add_locations':
4202  add_locations = 1
4203  del args[0]
4204  elif args[0] == '--add_locations_ana':
4205  add_locations_ana = 1
4206  del args[0]
4207  elif args[0] == '--check_locations':
4208  check_locations = 1
4209  del args[0]
4210  elif args[0] == '--check_locations_ana':
4211  check_locations_ana = 1
4212  del args[0]
4213  elif args[0] == '--upload':
4214  upload = 1
4215  del args[0]
4216  elif args[0] == '--upload_ana':
4217  upload_ana = 1
4218  del args[0]
4219  elif args[0] == '--check_tape':
4220  check_tape = 1
4221  del args[0]
4222  elif args[0] == '--check_tape_ana':
4223  check_tape_ana = 1
4224  del args[0]
4225  elif args[0] == '--clean_locations':
4226  clean_locations = 1
4227  del args[0]
4228  elif args[0] == '--clean_locations_ana':
4229  clean_locations_ana = 1
4230  del args[0]
4231  elif args[0] == '--remove_locations':
4232  remove_locations = 1
4233  del args[0]
4234  elif args[0] == '--remove_locations_ana':
4235  remove_locations_ana = 1
4236  del args[0]
4237  else:
4238  print('Unknown option %s' % args[0])
4239  return 1
4240 
4241  # Normalize xml file path.
4242 
4243  xmlfile = normxmlpath(xmlfile)
4244 
4245  # Make sure xmlfile was specified.
4246 
4247  if xmlfile == '':
4248  print('No xml file specified. Type "project.py -h" for help.')
4249  return 1
4250 
4251  # Make sure that no more than one action was specified (except clean, shorten, and info
4252  # options).
4253 
4254  num_action = submit + check + checkana + fetchlog + merge + mergehist + mergentuple + audit + stage_status + makeup + define + define_ana + undefine + declare + declare_ana
4255  if num_action > 1:
4256  print('More than one action was specified.')
4257  return 1
4258 
4259  # Extract all project definitions.
4260 
4261  projects = get_projects(xmlfile, check=(not nocheck))
4262 
4263  # Get the selected project element.
4264 
4265  for stagename in stagenames:
4266  project = select_project(projects, projectname, stagename)
4267  if project != None:
4268  if projectname == '':
4269  projectname = project.name
4270  else:
4271  raise RuntimeError('No project selected.\n')
4272 
4273  # Do clean action now. Cleaning can be combined with submission.
4274 
4275  if clean:
4276  for stagename in stagenames:
4277  docleanx(projects, projectname, stagename, clean_descendants = True)
4278 
4279  # Do clean_one action now. Cleaning can be combined with submission.
4280 
4281  if clean_one:
4282  for stagename in stagenames:
4283  docleanx(projects, projectname, stagename, clean_descendants = False)
4284 
4285  # Do stage_status now.
4286 
4287  if stage_status:
4288  dostatus(projects)
4289  return 0
4290 
4291  # Get the current stage definition, and pubsify it if necessary.
4292  # Also process any command line stage configuration overrides.
4293 
4294  stages = {}
4295  for stagename in stagenames:
4296  stage = project.get_stage(stagename)
4297  stages[stagename] = stage
4298 
4299  # Command line configuration overrides handled here.
4300 
4301  if lines != '':
4302  stage.lines = lines
4303  if site != '':
4304  stage.site = site
4305  if cpu != 0:
4306  stage.cpu = cpu
4307  if disk != '':
4308  stage.disk = disk
4309  if memory != 0:
4310  stage.memory = memory
4311  if inputdef != '':
4312  stage.inputdef = inputdef
4313  stage.inputfile = ''
4314  stage.inputlist = ''
4315  if recur != 0:
4316  stage.recur = recur
4317 
4318  # Pubs mode overrides handled here.
4319 
4320  if pubs:
4321  stage.pubsify_input(pubs_run, pubs_subruns, pubs_version)
4322  stage.pubsify_output(pubs_run, pubs_subruns, pubs_version)
4323 
4324  # Make recursive dataset definition here, if necessary.
4325 
4326  if stage.recur and stage.inputdef != '' and stage.basedef != '':
4327 
4328  # First check if stage.inptudef already exists.
4329 
4330  import_samweb()
4331  def_exists = False
4332  try:
4333  desc = samweb.descDefinition(defname=stage.inputdef)
4334  def_exists = True
4335  except samweb_cli.exceptions.DefinitionNotFound:
4336  pass
4337 
4338  if not def_exists:
4339 
4340  # Recurcive definition doesn't exist, so create it.
4341 
4342  project_utilities.test_kca()
4343 
4344  # Start sam dimension with the base dataset.
4345 
4346  dim = ''
4347 
4348  # Add minus clause.
4349 
4350  project_wildcard = '%s_%%' % samweb.makeProjectName(stage.inputdef).rsplit('_',1)[0]
4351  if stage.recurtype == 'snapshot':
4352  dim = 'defname: %s minus snapshot_for_project_name %s' % \
4353  (stage.basedef, project_wildcard)
4354  elif stage.recurtype == 'consumed':
4355  dim = 'defname: %s minus (project_name %s and consumed_status consumed)' % \
4356  (stage.basedef, project_wildcard)
4357 
4358  elif stage.recurtype == 'child':
4359 
4360  # In case of multiple data strams, generate one clause for each
4361  # data stream.
4362 
4363  nstream = 1
4364  if stage.data_stream != None and len(stage.data_stream) > 0:
4365  nstream = len(stage.data_stream)
4366 
4367  dim = ''
4368  for istream in range(nstream):
4369  idim = project_utilities.dimensions_datastream(project, stage,
4370  ana=False, index=istream)
4371  if idim.find('anylocation') > 0:
4372  idim = idim.replace('anylocation', 'physical')
4373  else:
4374  idim += ' with availability physical'
4375 
4376  if len(dim) > 0:
4377  dim += ' or '
4378  dim += '(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
4379 
4380  if stage.activebase != '':
4381  activedef = '%s_active' % stage.activebase
4382  waitdef = '%s_wait' % stage.activebase
4383  dim += ' minus defname: %s' % activedef
4384  dim += ' minus defname: %s' % waitdef
4385  project_utilities.makeDummyDef(activedef)
4386  project_utilities.makeDummyDef(waitdef)
4387 
4388  elif stage.recurtype == 'anachild':
4389 
4390  # In case of multiple data strams, generate one clause for each
4391  # data stream.
4392 
4393  nstream = 1
4394  if stage.ana_data_stream != None and len(stage.ana_data_stream) > 0:
4395  nstream = len(stage.ana_data_stream)
4396 
4397  dim = ''
4398  for istream in range(nstream):
4399  idim = project_utilities.dimensions_datastream(project, stage,
4400  ana=True, index=istream)
4401  if idim.find('anylocation') > 0:
4402  idim = idim.replace('anylocation', 'physical')
4403  else:
4404  idim += ' with availability physical'
4405 
4406  if len(dim) > 0:
4407  dim += ' or '
4408  dim += '(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
4409 
4410  if stage.activebase != '':
4411  activedef = '%s_active' % stage.activebase
4412  waitdef = '%s_wait' % stage.activebase
4413  dim += ' minus defname: %s' % activedef
4414  dim += ' minus defname: %s' % waitdef
4415  project_utilities.makeDummyDef(activedef)
4416  project_utilities.makeDummyDef(waitdef)
4417 
4418  elif stage.recurtype != '' and stage.recurtype != 'none':
4419  raise RuntimeError('Unknown recursive type %s.' % stage.recurtype)
4420 
4421  # Add "with limit" clause.
4422 
4423  if stage.recurlimit != 0:
4424  dim += ' with limit %d' % stage.recurlimit
4425 
4426  # Create definition.
4427 
4428  print('Creating recursive dataset definition %s' % stage.inputdef)
4429  project_utilities.test_kca()
4430  samweb.createDefinition(defname=stage.inputdef, dims=dim)
4431 
4432 
4433  # Do dump stage action now.
4434 
4435  if dump_stage:
4436  for stagename in stagenames:
4437  print('Stage %s:' % stagename)
4438  stage = stages[stagename]
4439  print(stage)
4440 
4441  # Do dump project action now.
4442 
4443  if dump_project:
4444  print(project)
4445 
4446  # Do outdir action now.
4447 
4448  if print_outdir:
4449  for stagename in stagenames:
4450  print('Stage %s:' % stagename)
4451  stage = stages[stagename]
4452  print(stage.outdir)
4453 
4454  # Do logdir action now.
4455 
4456  if print_logdir:
4457  for stagename in stagenames:
4458  print('Stage %s:' % stagename)
4459  stage = stages[stagename]
4460  print(stage.logdir)
4461 
4462  # Do logdir action now.
4463 
4464  if print_workdir:
4465  for stagename in stagenames:
4466  print('Stage %s:' % stagename)
4467  stage = stages[stagename]
4468  print(stage.workdir)
4469 
4470  # Do bookdir action now.
4471 
4472  if print_bookdir:
4473  for stagename in stagenames:
4474  print('Stage %s:' % stagename)
4475  stage = stages[stagename]
4476  print(stage.bookdir)
4477 
4478  # Do defname action now.
4479 
4480  if defname:
4481  for stagename in stagenames:
4482  print('Stage %s:' % stagename)
4483  stage = stages[stagename]
4484  if stage.defname != '':
4485  print(stage.defname)
4486 
4487  # Do input_names action now.
4488 
4489  if do_input_files:
4490  for stagename in stagenames:
4491  print('Stage %s:' % stagename)
4492  stage = stages[stagename]
4493  input_files = get_input_files(stage)
4494  for input_file in input_files:
4495  print(input_file)
4496 
4497  # Do check_submit action now.
4498 
4499  if do_check_submit:
4500  for stagename in stagenames:
4501  print('Stage %s:' % stagename)
4502  stage = stages[stagename]
4503  stage.checksubmit()
4504 
4505  # Do check_input action now.
4506 
4507  if do_check_input:
4508  for stagename in stagenames:
4509  print('Stage %s:' % stagename)
4510  stage = stages[stagename]
4511  stage.checkinput(checkdef=True)
4512 
4513  # Do shorten action now.
4514 
4515  if shorten:
4516  for stagename in stagenames:
4517  print('Stage %s:' % stagename)
4518  stage = stages[stagename]
4519  doshorten(stage)
4520 
4521  # Do actions.
4522 
4523  rc = 0
4524 
4525  if submit or makeup:
4526 
4527  # Submit jobs.
4528 
4529  for stagename in stagenames:
4530  print('Stage %s:' % stagename)
4531 
4532  if project_utilities.check_running(xmlfile, stagename):
4533  print('Skipping job submission because similar job submission process is running.')
4534  else:
4535  stage = stages[stagename]
4536  dosubmit(project, stage, makeup, stage.recur, dryrun)
4537 
4538  if check or checkana:
4539 
4540  # Check results from specified project stage.
4541 
4542  for stagename in stagenames:
4543  print('Stage %s:' % stagename)
4544  stage = stages[stagename]
4545  docheck(project, stage, checkana or stage.ana, stage.validate_on_worker)
4546 
4547  if fetchlog:
4548 
4549  # Fetch logfiles.
4550 
4551  for stagename in stagenames:
4552  print('Stage %s:' % stagename)
4553  stage = stages[stagename]
4554  rc += dofetchlog(project, stage)
4555 
4556  if mergehist or mergentuple or merge:
4557 
4558  # Make merged histogram or ntuple files using proper hadd option.
4559  # Makes a merged root file called anahist.root in the project output directory
4560 
4561  for stagename in stagenames:
4562  print('Stage %s:' % stagename)
4563  stage = stages[stagename]
4564  domerge(stage, mergehist, mergentuple)
4565 
4566  if audit:
4567 
4568  # Sam audit.
4569 
4570  for stagename in stagenames:
4571  print('Stage %s:' % stagename)
4572  stage = stages[stagename]
4573  doaudit(stage)
4574 
4575  if check_definition or define:
4576 
4577  # Make sam dataset definition.
4578 
4579  for stagename in stagenames:
4580  print('Stage %s:' % stagename)
4581  stage = stages[stagename]
4582  if stage.ana:
4583  if stage.ana_defname == '':
4584  print('No sam analysis dataset definition name specified for this stage.')
4585  return 1
4586  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4587  docheck_definition(stage.ana_defname, dim, define)
4588  else:
4589  if stage.defname == '':
4590  print('No sam dataset definition name specified for this stage.')
4591  return 1
4592  dim = project_utilities.dimensions_datastream(project, stage, ana=False)
4593  docheck_definition(stage.defname, dim, define)
4594 
4595  if check_definition_ana or define_ana:
4596 
4597  # Make sam dataset definition for analysis files.
4598 
4599  for stagename in stagenames:
4600  print('Stage %s:' % stagename)
4601  stage = stages[stagename]
4602  if stage.ana_defname == '':
4603  print('No sam analysis dataset definition name specified for this stage.')
4604  return 1
4605  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4606  docheck_definition(stage.ana_defname, dim, define_ana)
4607 
4608  if test_definition:
4609 
4610  # Print summary of files returned by dataset definition.
4611 
4612  for stagename in stagenames:
4613  print('Stage %s:' % stagename)
4614  stage = stages[stagename]
4615  if stage.ana:
4616  if stage.ana_defname == '':
4617  print('No sam dataset definition name specified for this stage.')
4618  return 1
4619  rc += dotest_definition(stage.ana_defname)
4620  else:
4621  if stage.defname == '':
4622  print('No sam dataset definition name specified for this stage.')
4623  return 1
4624  rc += dotest_definition(stage.defname)
4625 
4626  if test_definition_ana:
4627 
4628  # Print summary of files returned by analysis dataset definition.
4629 
4630  for stagename in stagenames:
4631  print('Stage %s:' % stagename)
4632  stage = stages[stagename]
4633  if stage.ana_defname == '':
4634  print('No sam dataset definition name specified for this stage.')
4635  return 1
4636  rc += dotest_definition(stage.ana_defname)
4637 
4638  if undefine:
4639 
4640  # Delete sam dataset definition.
4641 
4642  for stagename in stagenames:
4643  print('Stage %s:' % stagename)
4644  stage = stages[stagename]
4645  if stage.defname == '':
4646  print('No sam dataset definition name specified for this stage.')
4647  return 1
4648  rc += doundefine(stage.defname)
4649 
4650  if check_declarations or declare:
4651 
4652  # Check sam declarations.
4653 
4654  for stagename in stagenames:
4655  print('Stage %s:' % stagename)
4656  stage = stages[stagename]
4657  docheck_declarations(stage.bookdir, stage.outdir, declare, ana=stage.ana)
4658 
4659  if check_declarations_ana or declare_ana:
4660 
4661  # Check sam analysis declarations.
4662 
4663  for stagename in stagenames:
4664  print('Stage %s:' % stagename)
4665  stage = stages[stagename]
4666  docheck_declarations(stage.bookdir, stage.outdir, declare_ana, ana=True)
4667 
4668  if test_declarations:
4669 
4670  # Print summary of declared files.
4671 
4672  for stagename in stagenames:
4673  print('Stage %s:' % stagename)
4674  stage = stages[stagename]
4675  dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4676  rc += dotest_declarations(dim)
4677 
4678  if test_declarations_ana:
4679 
4680  # Print summary of declared files.
4681 
4682  for stagename in stagenames:
4683  print('Stage %s:' % stagename)
4684  stage = stages[stagename]
4685  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4686  rc += dotest_declarations(dim)
4687 
4688  if check_locations or add_locations or clean_locations or remove_locations or upload:
4689 
4690  # Check sam disk locations.
4691 
4692  for stagename in stagenames:
4693  print('Stage %s:' % stagename)
4694  stage = stages[stagename]
4695  dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4696  docheck_locations(dim, stage.outdir,
4697  add_locations, clean_locations, remove_locations,
4698  upload)
4699 
4700  if check_locations_ana or add_locations_ana or clean_locations_ana or \
4701  remove_locations_ana or upload_ana:
4702 
4703  # Check sam disk locations.
4704 
4705  for stagename in stagenames:
4706  print('Stage %s:' % stagename)
4707  stage = stages[stagename]
4708  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4709  docheck_locations(dim, stage.outdir,
4710  add_locations_ana, clean_locations_ana, remove_locations_ana,
4711  upload_ana)
4712 
4713  if check_tape:
4714 
4715  # Check sam tape locations.
4716 
4717  for stagename in stagenames:
4718  print('Stage %s:' % stagename)
4719  stage = stages[stagename]
4720  dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4721  docheck_tape(dim)
4722 
4723  if check_tape_ana:
4724 
4725  # Check analysis file sam tape locations.
4726 
4727  for stagename in stagenames:
4728  print('Stage %s:' % stagename)
4729  stage = stages[stagename]
4730  dim = project_utilities.dimensions_datastream(project, stage, ana=True)
4731  docheck_tape(dim)
4732 
4733  # Done.
4734 
4735  return rc
4736 
4737 # Open and truncate a file for writing using larbatch_posix.open.
4738 
4739 def safeopen(destination):
4740  if larbatch_posix.exists(destination):
4741  larbatch_posix.remove(destination)
4742  file = larbatch_posix.open(destination, 'w')
4743  return file
4744 
4745 # Invoke main program.
4746 
4747 #Utility funciton to scan a file and return its contents as a list
4748 def scan_file(fileName):
4749  #openable = 1
4750  returnArray = []
4751  try:
4752  #print 'Reading %s' % fileName
4753  fileList = project_utilities.saferead(fileName)
4754  #if we can't find missing_files the check will not work
4755  except:
4756  #print 'Cannot open file: %s' % fileName
4757  return [ -1 ]
4758 
4759  if len(fileList) > 0:
4760  for line in fileList:
4761  returnArray.append(line.strip())
4762 
4763  else:
4764  #print '%s exists, but is empty' % fileName
4765 
4766  return [ -1 ]
4767 
4768  return returnArray
4769 
4770 if __name__ == '__main__':
4771  sys.exit(main(sys.argv))
4772 
4773 '''inputlist = []
4774  inp = open(stage.inputlist,"r")
4775  for line in inp:
4776  columns = line.split("/")
4777  columns = [col.strip() for col in columns]
4778  inputlist.append(columns[8])
4779  inp.close()'''
def dotest_declarations
Definition: project.py:2158
then if[["$THISISATEST"==1]]
Definition: neoSmazza.sh:95
def find_projects
Definition: project.py:665
def select_project
Definition: project.py:740
def dosubmit
Definition: project.py:3634
def check_root
Definition: project.py:889
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def write
Definition: util.py:23
def docheck
Definition: project.py:1087
def safeopen
Definition: project.py:4739
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep.
def help
Definition: project.py:3876
def import_samweb
Definition: project.py:504
def dojobsub
Definition: project.py:2482
def dotest_definition
Definition: project.py:2217
def doundefine
Definition: project.py:2233
def dostatus
Definition: project.py:625
def next_stage
Definition: project.py:762
def docheck_definition
Definition: project.py:2176
def main
Definition: project.py:3965
def get_pubs_stage
Definition: project.py:814
def get_project
Definition: project.py:755
def check_root_file
Definition: project.py:838
def untarlog
Definition: project.py:1013
def xmlhelp
Definition: project.py:3944
def docheck_declarations
Definition: project.py:2071
def doquickcheck
Definition: project.py:1578
def doshorten
Definition: project.py:970
def get_projects
Definition: project.py:700
def doaudit
Definition: project.py:3778
def docleanx
Definition: project.py:519
def get_input_files
Definition: project.py:941
def docheck_tape
Definition: project.py:2432
print OUTPUT<< EOF;< setup name="Default"version="1.0">< worldref="volWorld"/></setup ></gdml > EOF close(OUTPUT)
def normxmlpath
Definition: project.py:3906
def scan_file
Definition: project.py:4748
def domerge
Definition: project.py:3721
def docheck_locations
Definition: project.py:2265
def previous_stage
Definition: project.py:788
list
Definition: file_to_url.sh:28
def dofetchlog
Definition: project.py:1946
open(RACETRACK) or die("Could not open file $RACETRACK for writing")