6 __doc__ =
"""Manages SAM definitions for ICARUS data run."""
7 __author__ =
"Gianluca Petrillo (petrillo@slac.stanford.edu)"
8 __date__ = time.strptime(
"July 7, 2021",
"%B %d, %Y")
11 from samweb_client.client
import SAMWebClient
12 import samweb_client.exceptions
as samexcpt
16 ExperimentName =
"ICARUS"
17 DefaultStages = [
'raw',
'stage0',
'stage1', ]
20 'raw':
"data_tier raw",
21 'stage0':
"icarus_project.stage stage0",
22 'stage1':
"icarus_project.stage stage1",
32 def __init__(self, run=None, stage=None, stream=None, projectVersion=None, ):
33 self.
run = SampleInfo._copyList(run)
34 self.
stage = SampleInfo._copyList(stage)
35 self.
stream = SampleInfo._copyList(stream)
39 def isRunDefined(self) -> "returns if run is collapsed to a value":
40 return not SampleInfo.hasOptions(self.
run) \
41 and (AllowAllRuns
or self.
run is not None)
45 return not SampleInfo.hasOptions(self.
stage)
48 return not SampleInfo.hasOptions(self.
stream)
53 def isComplete(self) -> "returns if there are no ambiguities left in the info":
59 components = [ ExperimentName,
'data' ]
61 components.append(f
"run{self.run}")
63 components.append(self.
stage)
65 components.append(self.
stream)
72 kwargs.setdefault(
'run', self.
run)
73 kwargs.setdefault(
'stage', self.
stage),
74 kwargs.setdefault(
'stream', self.
stream),
80 s = f
"run {self.run if self.run else 'unspecified'}"
81 if self.
stage: s += f
", stage {self.stage}"
82 if self.
stream: s += f
", stream {self.stream}"
83 if self.
projectVersion: s += f
", project version {self.projectVersion}"
89 return value
if SampleInfo.hasOptions(value)
else [ value ]
93 return SampleInfo._isIterable(value)
and not isinstance(value, str)
100 return value[:]
if SampleInfo._isIterable(value)
else value
107 """Callable object making a SAM query out of a `SampleInfo` object."""
110 info:
"SampleInfo object with all the information for the query",
111 minimum:
"throws an exception if fewer than these elements are included" = 0,
112 ) ->
"a SAM dimensions query string":
115 DimensionQueryMaker.simpleItem(
'run_number', info.run),
116 DimensionQueryMaker.multiItem(StageDimensions, info.stage,
'stage'),
117 DimensionQueryMaker.simpleItem(
'data_stream', info.stream),
118 DimensionQueryMaker.simpleItem(
'icarus_project.version', info.projectVersion),
121 if len(dims) < minimum:
122 raise RuntimeError(f
"Query resulted in only {len(dims)} constraints: '{query}'")
132 if SampleInfo.hasOptions(value):
133 return f
"{key} in ( { ' '.join(map(str, value)) } )"
134 elif value
is not None:
return f
"{key} {value}"
141 values = SampleInfo.makeOptionList(values)
142 if not values
or None in values:
return ""
146 dims.append(queries[value])
148 raise RuntimeError(f
"{typeName} '{value}' not supported.")
150 if len(dims) > 1: dims =
list(map(DimensionQueryMaker.addParentheses, dims))
151 query =
" or ".
join(dims)
if dims
else ""
152 if len(dims) > 1: query = DimensionQueryMaker.addParentheses(query)
163 self.
samweb = samweb
if samweb
else SAMWebClient()
167 assert self.
samweb,
"SAM web client not initialized. We do not go anywhere."
170 if info.projectVersion
is None:
171 if info.stage !=
'raw':
174 else: projectVersions = [
None ]
176 projectVersions = SampleInfo.makeOptionList(info.projectVersion)
179 for prjVer
in projectVersions:
180 prjInfo = info.copy(projectVersion=prjVer)
187 assert info.isStageComplete(),
"Stage must be set."
188 for stream
in SampleInfo.makeOptionList(info.stream):
189 streamInfo = info.copy(stream=stream)
195 def iterateStages(self, info:
"SampleInfo object defining iteration ranges"):
196 assert info.stage
is not None,
"Stages needs to be explicitly specified."
197 for stage
in SampleInfo.makeOptionList(info.stage):
198 assert stage,
"Stage must be specified."
199 stageInfo = info.copy(stage=stage)
200 logging.debug(f
"Processing {stageInfo}")
206 def iterate(self, info:
"SampleInfo object defining iteration ranges"):
207 """Iterates through all elements in `info`."""
208 for run
in SampleInfo.makeOptionList(info.run):
209 runInfo = info.copy(run=run)
217 """Returns the project versions from the all files matching `dims`."""
218 logging.debug(
"Discovering project versions for %s", dims)
221 files = self.samweb.listFiles(dims, fileinfo=
True)
222 except samexcpt.Error:
223 logging.error(f
"SAM exception while translating query: '{dims}'")
226 logging.debug(f
" => querying metadata for {len(files)} files")
229 meta[
'icarus_project.version']
230 for meta
in self.samweb.getMetadataIterator(fInfo.file_id
for fInfo
in files)
232 logging.debug(f
" => extracted {len(versions)} versions: %s", versions)
233 return list(versions)
242 def __init__(self, samweb, create=False, query=False, printDefs=False,
243 describe=
False, check=
False, delete=
False,
244 fake=
False, force=
False, prependUser=
True,
248 if check: self.actions.append(
"check")
249 if create: self.actions.append(
"create")
250 if query: self.actions.append(
"query")
251 if printDefs: self.actions.append(
"printDefs")
252 if describe: self.actions.append(
"describe")
253 if delete: self.actions.append(
"delete")
255 raise RuntimeError(
"At least one action needs to be enabled.")
260 self.
samweb = samweb
if samweb
else SAMWebClient()
264 except samexcpt.Error
as e:
266 logging.error(
"Could not find out your name! %s", e)
275 assert self.
samweb,
"SAM web client not initialized. We do not go anywhere."
277 if not info.isComplete():
278 raise RuntimeError(f
"Can't process incomplete specification: {info}")
282 logging.debug(
"Info: %s => dim='%s'", info, dim)
287 if action ==
"printDefs":
290 elif action ==
"check":
293 elif action ==
"describe":
296 elif action ==
"query":
297 self.
doQuery(info=info, defName=defName, dim=dim)
299 elif action ==
"create":
302 elif action ==
"delete":
306 raise RuntimeError(f
"LOGIC ERROR: action {action} not implemented.")
316 print(f
"DRYRUN> descDefinition({defName!r})")
319 print(self.samweb.descDefinition(defName))
320 except samexcpt.DefinitionNotFound:
321 print(f
"Definition Name: {defName} => NOT FOUND")
326 print(f
"DRYRUN> {dim}")
329 summary = self.
getSummary(info=info, defName=defName, dims=dim)
330 except samexcpt.Error
as e:
331 logging.error(f
"Query of definition {defName} (query: '{dim}') failed: %s", e)
334 print(f
"{info}: {(summary['total_event_count'] if summary['total_event_count'] else 'unknown')} events"
335 f
" in {summary['file_count']} files"
336 f
" ({summary['total_file_size']/(1 << 30):g} GiB)"
340 return summary
if summary
else e
346 print(f
"DRYRUN> countFiles(defname={defName!r})")
350 print(f
"{defName} not available")
353 print(f
"{defName} available ({count} files)")
364 if count
is not None:
365 logging.error(f
"Definition {defName!r} already exists (and matches {count} files).")
369 count = self.samweb.countFiles(dimensions=dim)
370 except samexcpt.Error
as e:
371 logging.error(f
"Attempt to count matches with {dim!r} failed: %s", e)
374 print(f
"Definition {defName} NOT created as it would match no file (query: {dim!r})")
376 logging.debug(f
"Creating {defName!r}, now matching {count} files")
378 print(f
"DRYRUN> createDefinition(defname={defName!r}, dims={dim!r}, description={descr!r})")
381 self.samweb.createDefinition(defname=defName, dims=dim, description=descr)
382 print(f
"{defName} created ({count} files)")
383 except samexcpt.Error
as e:
385 (f
"Failed to create definition {defName} from query='{dim}': %s", e)
399 defInfo = self.samweb.descDefinitionDict(defName)
400 except samexcpt.DefinitionNotFound:
401 print(f
"Definition Name: {defName} => NOT FOUND")
405 ForcedMsg = {
True:
"forced to delete it anyway",
False:
"won't delete unless forced to", }
409 try: self.
SAMuser = self.samweb.get_user()
410 except samexcpt.Error
as e:
411 logging.error(
"Could not find out your name! %s", e)
413 try: SAMgroup = self.samweb.get_group()
414 except samexcpt.Error
as e:
415 logging.error(
"Could not find out the name of your group! %s", e)
416 logging.debug(f
"You appear to be {SAMuser!r} of group {SAMgroup!r}")
418 if defInfo[
'username'] != SAMuser:
420 f
"Definition {defName!r} was created on {defInfo['create_time']}"
421 f
" by {defInfo['username']}/{defInfo['group']}, not by you ({SAMuser})"
427 if defInfo[
'group'] != SAMgroup:
429 f
"Definition {defName!r} was created on {defInfo['create_time']}"
430 f
" by {defInfo['username']}/{defInfo['group']}, not by your group ({SAMgroup})"
436 if defInfo[
'dimensions'] != dim:
437 logging.warning(f
"Definition {defName!r} has unexpected query:"
438 f
" ({defInfo['dimensions']!r}, expected: {dim!r}); {ForcedMsg[self.force]}."
440 if not self.
force: checksOk =
False
443 if defInfo[
'description'] != descr:
445 f
"Definition {defName!r} appears not to be created with this program:"
446 f
" description mismatch"
447 f
" ({defInfo['description']!r}, expected: {descr!r}); {ForcedMsg[self.force]}."
449 if not self.
force: checksOk =
False
453 logging.error(f
"Definition {defName!r} will NOT be deleted.")
457 print(f
"DRYRUN> deleteDefinition({defName!r})")
460 self.samweb.deleteDefinition(defName)
461 except samexcpt.DefinitionNotFound:
462 logging.error(f
"Definition {defName} NOT FOUND (can't be deleted)")
465 except samexcpt.NoAuthorizationCredentials
as e:
466 logging.error(f
"Failed to delete definition {defName!r} for lack of credentials: %s", e)
467 except samexcpt.Error
as e:
468 logging.error(f
"Failed to delete definition {defName!r}: %s", e)
471 if count
is not None:
472 logging.error(f
"Deletion of definition {defName!r} silently FAILED"
473 f
" (still there with its own {count} files).")
476 print(f
"Definition {defName} successfully deleted.")
483 else:
return info.defName()
487 assert info
or defName
or dims
488 e = RuntimeError(
"Insufficient parameters to get summary")
490 if not defName
and info: defName = self.
buildDefName(info)
492 try:
return self.samweb.listFilesSummary(defname=defName)
493 except samexcpt.DefinitionNotFound
as e: queryError = e
495 if not dims
and info: dims = self.
buildQuery(info)
497 try:
return self.samweb.listFilesSummary(dims)
498 except samexcpt.Error
as e: queryError = e
505 """Returns the count of files of `defName`, `None` if not found.
507 Throws exception in all other error situations.
509 try:
return self.samweb.countFiles(defname=defName)
510 except samexcpt.DefinitionNotFound:
return None
521 if not isinstance(l, str)
and len(l) == 1:
return next(iter(l))
522 except TypeError:
pass
527 if __name__ ==
"__main__":
531 parser = argparse.ArgumentParser(description=__doc__)
533 SampleGroup = parser.add_argument_group(title=
"Sample selection")
534 SampleGroup.add_argument(
"runs", nargs=
"*", type=int, help=
"runs to process")
535 SampleGroup.add_argument(
"--stage",
"-s", action=
"append",
536 help=
"stages to include {DefaultStages}")
537 SampleGroup.add_argument(
"--prjversion",
"-p", action=
"append",
538 help=
"project versions to include [autodetect (resource-intensive!)]")
539 SampleGroup.add_argument(
"--stream",
"-f", action=
"append",
540 help=
"data streams to include (use 'any' for... any) [any]")
541 SampleGroup.add_argument(
"--global",
"-g", dest=
'globalDef',
542 action=
"store_true", help=
"do not prepend SAM user name to definitions")
544 ActionGroup = parser.add_argument_group(title=
"Actions")
545 ActionGroup.add_argument(
"--check", action=
"store_true",
546 help=
"prints whether the definition for the sample is available")
547 ActionGroup.add_argument(
"--describe", action=
"store_true",
548 help=
"describes an existing definition for the sample")
549 ActionGroup.add_argument(
"--query", action=
"store_true",
550 help=
"queries the definitions related to the samples")
551 ActionGroup.add_argument(
"--defname", action=
"store_true",
552 help=
"prints the name of the definitions related to the samples")
553 ActionGroup.add_argument(
"--create", action=
"store_true",
554 help=
"creates one definition per sample (use --defname to see their names)")
555 ActionGroup.add_argument(
"--delete", action=
"store_true",
556 help=
"attempts to remove one definition per sample")
558 GeneralOptGroup = parser.add_argument_group(title=
"General options")
559 GeneralOptGroup.add_argument(
"--test", action=
"store_true",
560 help=
"tests the connection to SAM and exits")
561 GeneralOptGroup.add_argument(
"--force",
"-F", action=
"store_true",
562 help=
"skips safety checks of some operations")
563 GeneralOptGroup.add_argument(
"--fake",
"--dryrun",
"-n", action=
"store_true",
564 help=
"does not perform actual creation and query actions")
565 GeneralOptGroup.add_argument(
"--debug", action=
"store_true",
566 help=
"enable verbose debugging output")
567 GeneralOptGroup.add_argument(
"--version",
"-V", action=
"version",
568 version=f
"%(prog)s v{__version__} ({time.asctime(__date__)})",
569 help=
"prints the version number")
571 args = parser.parse_args()
573 logging.getLogger().setLevel(logging.DEBUG
if args.debug
else logging.INFO)
574 if args.stage
is None: args.stage = DefaultStages
576 args.stream = [
None if s ==
"any" else s
for s
in args.stream ]
578 try: samweb = SAMWebClient()
579 except samexcpt.Error
as e:
581 print(
"Failed to connect to SAM.");
583 logging.error(
"Failed to connect to SAM: %s", e)
587 try:
print(samweb.serverInfo())
588 except samexcpt.Error
as e:
589 logging.error(
"Test connection to SAM failed: %s", e)
591 print(
"\nConnection test succeeded.")
595 if not AllowAllRuns
and not args.runs:
596 logging.error(
"At least one run MUST be specified.")
602 create=args.create, query=args.query, printDefs=args.defname,
603 describe=args.describe, check=args.check, delete=args.delete,
604 fake=args.fake, force=args.force, prependUser=
not args.globalDef
def iterateProjectVersions
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def _discoverProjectVersions
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep.
def isProjectVersionComplete