All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
manageDataRunDefinitions.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import logging
4 import time
5 
6 __doc__ = """Manages SAM definitions for ICARUS data run."""
7 __author__ = "Gianluca Petrillo (petrillo@slac.stanford.edu)"
8 __date__ = time.strptime("July 7, 2021", "%B %d, %Y")
9 __version__ = "1.0"
10 
11 from samweb_client.client import SAMWebClient
12 import samweb_client.exceptions as samexcpt
13 
14 logging.basicConfig()
15 
16 ExperimentName = "ICARUS"
17 DefaultStages = [ 'raw', 'stage0', 'stage1', ]
18 
19 StageDimensions = {
20  'raw': "data_tier raw",
21  'stage0': "icarus_project.stage stage0",
22  'stage1': "icarus_project.stage stage1",
23 } # StageDimensions
24 
25 # this is a modal flag that is dangerous enough to be not user-controlled:
26 # if `True`, queries without specifying a run number will be allowed.
27 AllowAllRuns = False
28 
29 
30 # ------------------------------------------------------------------------------
31 class SampleInfo:
32  def __init__(self, run=None, stage=None, stream=None, projectVersion=None, ):
33  self.run = SampleInfo._copyList(run)
34  self.stage = SampleInfo._copyList(stage)
35  self.stream = SampleInfo._copyList(stream)
36  self.projectVersion = SampleInfo._copyList(projectVersion)
37  # __init__()
38 
39  def isRunDefined(self) -> "returns if run is collapsed to a value":
40  return not SampleInfo.hasOptions(self.run) \
41  and (AllowAllRuns or self.run is not None)
42  # isRunDefined()
43 
44  def isStageComplete(self) -> "returns if stage is collapsed to a value":
45  return not SampleInfo.hasOptions(self.stage)
46 
47  def isStreamComplete(self) -> "returns if stream is collapsed to a value":
48  return not SampleInfo.hasOptions(self.stream)
49 
50  def isProjectVersionComplete(self) -> "returns if project version is collapsed to a value":
51  return not SampleInfo.hasOptions(self.projectVersion)
52 
53  def isComplete(self) -> "returns if there are no ambiguities left in the info":
54  return self.isRunDefined() and self.isStageComplete() \
55  and self.isStreamComplete() and self.isProjectVersionComplete()
56  # isComplete()
57 
58  def defName(self):
59  components = [ ExperimentName, 'data' ]
60  if self.isRunDefined() and self.run is not None:
61  components.append(f"run{self.run}")
62  if self.isStageComplete() and self.stage is not None:
63  components.append(self.stage)
64  if self.isStreamComplete() and self.stream is not None:
65  components.append(self.stream)
66  if self.isProjectVersionComplete() and self.projectVersion is not None:
67  components.append(self.projectVersion)
68  return "_".join(filter(None, components))
69  # defName()
70 
71  def copy(self, **kwargs):
72  kwargs.setdefault('run', self.run)
73  kwargs.setdefault('stage', self.stage),
74  kwargs.setdefault('stream', self.stream),
75  kwargs.setdefault('projectVersion', self.projectVersion),
76  return SampleInfo(**kwargs)
77  # copy()
78 
79  def __str__(self):
80  s = f"run {self.run if self.run else 'unspecified'}"
81  if self.stage: s += f", stage {self.stage}"
82  if self.stream: s += f", stream {self.stream}"
83  if self.projectVersion: s += f", project version {self.projectVersion}"
84  return s
85  # __str__()
86 
87  @staticmethod
88  def makeOptionList(value):
89  return value if SampleInfo.hasOptions(value) else [ value ]
90 
91  @staticmethod
92  def hasOptions(value):
93  return SampleInfo._isIterable(value) and not isinstance(value, str)
94 
95  @staticmethod
96  def _isIterable(value): return hasattr(value, "__iter__")
97 
98  @staticmethod
99  def _copyList(value):
100  return value[:] if SampleInfo._isIterable(value) else value
101 
102 # class SampleInfo
103 
104 
105 # ------------------------------------------------------------------------------
107  """Callable object making a SAM query out of a `SampleInfo` object."""
108 
109  def __call__(self,
110  info: "SampleInfo object with all the information for the query",
111  minimum: "throws an exception if fewer than these elements are included" = 0,
112  ) -> "a SAM dimensions query string":
113 
114  dims = [
115  DimensionQueryMaker.simpleItem('run_number', info.run),
116  DimensionQueryMaker.multiItem(StageDimensions, info.stage, 'stage'),
117  DimensionQueryMaker.simpleItem('data_stream', info.stream),
118  DimensionQueryMaker.simpleItem('icarus_project.version', info.projectVersion),
119  ]
120  query = " and ".join(filter(None, dims))
121  if len(dims) < minimum:
122  raise RuntimeError(f"Query resulted in only {len(dims)} constraints: '{query}'")
123  return query
124  # __call__()
125 
126  @staticmethod
127  def addParentheses(s): return "( " + s + " )" if s else ""
128 
129  @staticmethod
130  def simpleItem(key, value, typeName=None):
131  # if typeName is None: typeName = key
132  if SampleInfo.hasOptions(value):
133  return f"{key} in ( { ' '.join(map(str, value)) } )"
134  elif value is not None: return f"{key} {value}"
135  else: return ""
136  # simpleItem()
137 
138 
139  @staticmethod
140  def multiItem(queries, values, typeName):
141  values = SampleInfo.makeOptionList(values)
142  if not values or None in values: return ""
143  dims = []
144  for value in values:
145  try:
146  dims.append(queries[value])
147  except KeyError:
148  raise RuntimeError(f"{typeName} '{value}' not supported.")
149  # for
150  if len(dims) > 1: dims = list(map(DimensionQueryMaker.addParentheses, dims))
151  query = " or ".join(dims) if dims else ""
152  if len(dims) > 1: query = DimensionQueryMaker.addParentheses(query)
153  return query
154  # multiItem()
155 
156 # class DimensionQueryMaker
157 
158 
159 # ------------------------------------------------------------------------------
161 
162  def __init__(self, samweb):
163  self.samweb = samweb if samweb else SAMWebClient()
164 
165 
166  def iterateProjectVersions(self, info: "SampleInfo object defining iteration ranges"):
167  assert self.samweb, "SAM web client not initialized. We do not go anywhere."
168 
169  # expand the project versions: each version will be treated separately
170  if info.projectVersion is None:
171  if info.stage != 'raw':
172  versionQuery = DimensionQueryMaker()(info, minimum=2)
173  projectVersions = self._discoverProjectVersions(versionQuery)
174  else: projectVersions = [ None ] # no version constraint is ok for raw files
175  else:
176  projectVersions = SampleInfo.makeOptionList(info.projectVersion)
177  # if ... else
178 
179  for prjVer in projectVersions:
180  prjInfo = info.copy(projectVersion=prjVer)
181  yield prjInfo
182  # for
183  # iterateProjectVersions()
184 
185 
186  def iterateStreams(self, info: "SampleInfo object defining iteration ranges"):
187  assert info.isStageComplete(), "Stage must be set."
188  for stream in SampleInfo.makeOptionList(info.stream):
189  streamInfo = info.copy(stream=stream)
190  yield from self.iterateProjectVersions(streamInfo)
191  # for streams
192  # iterateStreams()
193 
194 
195  def iterateStages(self, info: "SampleInfo object defining iteration ranges"):
196  assert info.stage is not None, "Stages needs to be explicitly specified."
197  for stage in SampleInfo.makeOptionList(info.stage):
198  assert stage, "Stage must be specified."
199  stageInfo = info.copy(stage=stage)
200  logging.debug(f"Processing {stageInfo}")
201  yield from self.iterateStreams(stageInfo)
202  # for
203  # iterateStages()
204 
205 
206  def iterate(self, info: "SampleInfo object defining iteration ranges"):
207  """Iterates through all elements in `info`."""
208  for run in SampleInfo.makeOptionList(info.run):
209  runInfo = info.copy(run=run)
210  yield from self.iterateStages(runInfo)
211  # for
212  # iterate()
213  def __call__(self, info): return self.iterate(info)
214 
215 
216  def _discoverProjectVersions(self, dims):
217  """Returns the project versions from the all files matching `dims`."""
218  logging.debug("Discovering project versions for %s", dims)
219 
220  try:
221  files = self.samweb.listFiles(dims, fileinfo=True)
222  except samexcpt.Error:
223  logging.error(f"SAM exception while translating query: '{dims}'")
224  raise
225 
226  logging.debug(f" => querying metadata for {len(files)} files")
227 
228  versions = set(
229  meta['icarus_project.version']
230  for meta in self.samweb.getMetadataIterator(fInfo.file_id for fInfo in files)
231  )
232  logging.debug(f" => extracted {len(versions)} versions: %s", versions)
233  return list(versions)
234  # _discoverProjectVersions()
235 
236 # class SampleBrowser
237 
238 
239 # ------------------------------------------------------------------------------
241 
242  def __init__(self, samweb, create=False, query=False, printDefs=False,
243  describe=False, check=False, delete=False,
244  fake=False, force=False, prependUser=True,
245  ):
246  # action collection
247  self.actions = []
248  if check: self.actions.append("check")
249  if create: self.actions.append("create")
250  if query: self.actions.append("query")
251  if printDefs: self.actions.append("printDefs")
252  if describe: self.actions.append("describe")
253  if delete: self.actions.append("delete")
254  if not self.actions:
255  raise RuntimeError("At least one action needs to be enabled.")
256  self.fake = fake
257  self.force = force
258  self.prependUser = prependUser
259 
260  self.samweb = samweb if samweb else SAMWebClient()
262 
263  try: self.SAMuser = samweb.get_user()
264  except samexcpt.Error as e:
265  if self.prependUser:
266  logging.error("Could not find out your name! %s", e)
267  raise
268  self.SAMuser = None
269  #
270 
271  # __init__()
272 
273 
274  def __call__(self, info):
275  assert self.samweb, "SAM web client not initialized. We do not go anywhere."
276 
277  if not info.isComplete():
278  raise RuntimeError(f"Can't process incomplete specification: {info}")
279 
280  dim = self.buildQuery(info)
281 
282  logging.debug("Info: %s => dim='%s'", info, dim)
283  defName = self.buildDefName(info)
284 
285  for action in self.actions:
286 
287  if action == "printDefs":
288  print(defName)
289 
290  elif action == "check":
291  self.doCheck(defName)
292 
293  elif action == "describe":
294  self.doDescribe(defName)
295 
296  elif action == "query":
297  self.doQuery(info=info, defName=defName, dim=dim)
298 
299  elif action == "create":
300  self.doCreateDef(info, defName=defName, dim=dim)
301 
302  elif action == "delete":
303  self.doDeleteDef(info, defName=defName, dim=dim)
304 
305  else:
306  raise RuntimeError(f"LOGIC ERROR: action {action} not implemented.")
307 
308  # for
309 
310  return defName
311  # __call__()
312 
313 
314  def doDescribe(self, defName):
315  if self.fake:
316  print(f"DRYRUN> descDefinition({defName!r})")
317  return
318  try:
319  print(self.samweb.descDefinition(defName))
320  except samexcpt.DefinitionNotFound:
321  print(f"Definition Name: {defName} => NOT FOUND")
322  # doDescribe()
323 
324  def doQuery(self, info, defName, dim):
325  if self.fake:
326  print(f"DRYRUN> {dim}")
327  return None
328  try:
329  summary = self.getSummary(info=info, defName=defName, dims=dim)
330  except samexcpt.Error as e:
331  logging.error(f"Query of definition {defName} (query: '{dim}') failed: %s", e)
332  summary = None
333  else:
334  print(f"{info}: {(summary['total_event_count'] if summary['total_event_count'] else 'unknown')} events"
335  f" in {summary['file_count']} files"
336  f" ({summary['total_file_size']/(1 << 30):g} GiB)"
337  )
338  e = None
339  # try ... else
340  return summary if summary else e
341  # doQuery()
342 
343  def doCheck(self, defName):
344  assert defName
345  if self.fake:
346  print(f"DRYRUN> countFiles(defname={defName!r})")
347  return True
348  count = self.defNameCount(defName)
349  if count is None:
350  print(f"{defName} not available")
351  return False
352  else:
353  print(f"{defName} available ({count} files)")
354  return True
355  # doCheck()
356 
357  def doCreateDef(self, info, defName=None, dim=None):
358  assert info
359  if not defName: defName = self.buildDefName(info)
360  if not dim: dim = self.buildQuery(info)
361  descr = self.describeSample(info)
362 
363  count = self.defNameCount(defName)
364  if count is not None:
365  logging.error(f"Definition {defName!r} already exists (and matches {count} files).")
366  return None
367 
368  try:
369  count = self.samweb.countFiles(dimensions=dim)
370  except samexcpt.Error as e:
371  logging.error(f"Attempt to count matches with {dim!r} failed: %s", e)
372  return None
373  if count == 0:
374  print(f"Definition {defName} NOT created as it would match no file (query: {dim!r})")
375  return None
376  logging.debug(f"Creating {defName!r}, now matching {count} files")
377  if self.fake:
378  print(f"DRYRUN> createDefinition(defname={defName!r}, dims={dim!r}, description={descr!r})")
379  else:
380  try:
381  self.samweb.createDefinition(defname=defName, dims=dim, description=descr)
382  print(f"{defName} created ({count} files)")
383  except samexcpt.Error as e:
384  logging.error \
385  (f"Failed to create definition {defName} from query='{dim}': %s", e)
386  return None
387  # if
388  return defName
389  # doCreateDef()
390 
391  def doDeleteDef(self, info, defName=None, dim=None):
392  assert self.samweb
393  assert info
394  if not defName: defName = self.buildDefName(info)
395  if not dim: dim = self.buildQuery(info)
396  descr = self.describeSample(info)
397 
398  try:
399  defInfo = self.samweb.descDefinitionDict(defName)
400  except samexcpt.DefinitionNotFound:
401  print(f"Definition Name: {defName} => NOT FOUND") # we are ok... I guess
402  return None
403 
404  # we perform many checks to make sure this deletion is proper
405  ForcedMsg = { True: "forced to delete it anyway", False: "won't delete unless forced to", }
406  checksOk = True
407 
408  if not self.SAMuser:
409  try: self.SAMuser = self.samweb.get_user()
410  except samexcpt.Error as e:
411  logging.error("Could not find out your name! %s", e)
412  # if not cached already
413  try: SAMgroup = self.samweb.get_group()
414  except samexcpt.Error as e:
415  logging.error("Could not find out the name of your group! %s", e)
416  logging.debug(f"You appear to be {SAMuser!r} of group {SAMgroup!r}")
417 
418  if defInfo['username'] != SAMuser:
419  logging.warning(
420  f"Definition {defName!r} was created on {defInfo['create_time']}"
421  f" by {defInfo['username']}/{defInfo['group']}, not by you ({SAMuser})"
422  f": won't delete."
423  )
424  checksOk = False
425  # if
426 
427  if defInfo['group'] != SAMgroup:
428  logging.warning(
429  f"Definition {defName!r} was created on {defInfo['create_time']}"
430  f" by {defInfo['username']}/{defInfo['group']}, not by your group ({SAMgroup})"
431  f": won't delete."
432  )
433  checksOk = False
434  # if
435 
436  if defInfo['dimensions'] != dim:
437  logging.warning(f"Definition {defName!r} has unexpected query:"
438  f" ({defInfo['dimensions']!r}, expected: {dim!r}); {ForcedMsg[self.force]}."
439  )
440  if not self.force: checksOk = False
441  #
442 
443  if defInfo['description'] != descr:
444  logging.warning(
445  f"Definition {defName!r} appears not to be created with this program:"
446  f" description mismatch"
447  f" ({defInfo['description']!r}, expected: {descr!r}); {ForcedMsg[self.force]}."
448  )
449  if not self.force: checksOk = False
450  # if
451 
452  if not checksOk:
453  logging.error(f"Definition {defName!r} will NOT be deleted.")
454  return None
455 
456  if self.fake:
457  print(f"DRYRUN> deleteDefinition({defName!r})")
458  return defName
459  try:
460  self.samweb.deleteDefinition(defName)
461  except samexcpt.DefinitionNotFound:
462  logging.error(f"Definition {defName} NOT FOUND (can't be deleted)")
463  # except samexcpt.DefinitionNotFound: # which exception to match?
464  # logging.error(f"Definition {defName!r} has already been used and can't be deleted.")
465  except samexcpt.NoAuthorizationCredentials as e:
466  logging.error(f"Failed to delete definition {defName!r} for lack of credentials: %s", e)
467  except samexcpt.Error as e:
468  logging.error(f"Failed to delete definition {defName!r}: %s", e)
469 
470  count = self.defNameCount(defName)
471  if count is not None:
472  logging.error(f"Deletion of definition {defName!r} silently FAILED"
473  f" (still there with its own {count} files).")
474  return None
475  # if still there
476  print(f"Definition {defName} successfully deleted.")
477  return defName
478  # doDeleteDef()
479 
480 
481  def buildDefName(self, info):
482  if self.prependUser: return self.SAMuser + "_" + info.defName()
483  else: return info.defName()
484  # buildDefName()
485 
486  def getSummary(self, info=None, defName=None, dims=None):
487  assert info or defName or dims
488  e = RuntimeError("Insufficient parameters to get summary") # should not happen
489 
490  if not defName and info: defName = self.buildDefName(info)
491  if defName:
492  try: return self.samweb.listFilesSummary(defname=defName)
493  except samexcpt.DefinitionNotFound as e: queryError = e
494 
495  if not dims and info: dims = self.buildQuery(info)
496  if dims:
497  try: return self.samweb.listFilesSummary(dims)
498  except samexcpt.Error as e: queryError = e
499 
500  raise e
501  # getSummary()
502 
503 
504  def defNameCount(self, defName: "SAM definition name"):
505  """Returns the count of files of `defName`, `None` if not found.
506 
507  Throws exception in all other error situations.
508  """
509  try: return self.samweb.countFiles(defname=defName)
510  except samexcpt.DefinitionNotFound: return None
511  # defNameCount()
512 
513  def describeSample(self, info): return "ICARUS data " + str(info)
514 
515 # class SampleProcessClass
516 
517 
518 # ------------------------------------------------------------------------------
520  try:
521  if not isinstance(l, str) and len(l) == 1: return next(iter(l))
522  except TypeError: pass
523  return l
524 # collapseList()
525 
526 
527 if __name__ == "__main__":
528  import sys
529  import argparse
530 
531  parser = argparse.ArgumentParser(description=__doc__)
532 
533  SampleGroup = parser.add_argument_group(title="Sample selection")
534  SampleGroup.add_argument("runs", nargs="*", type=int, help="runs to process")
535  SampleGroup.add_argument("--stage", "-s", action="append",
536  help="stages to include {DefaultStages}")
537  SampleGroup.add_argument("--prjversion", "-p", action="append",
538  help="project versions to include [autodetect (resource-intensive!)]")
539  SampleGroup.add_argument("--stream", "-f", action="append",
540  help="data streams to include (use 'any' for... any) [any]")
541  SampleGroup.add_argument("--global", "-g", dest='globalDef',
542  action="store_true", help="do not prepend SAM user name to definitions")
543 
544  ActionGroup = parser.add_argument_group(title="Actions")
545  ActionGroup.add_argument("--check", action="store_true",
546  help="prints whether the definition for the sample is available")
547  ActionGroup.add_argument("--describe", action="store_true",
548  help="describes an existing definition for the sample")
549  ActionGroup.add_argument("--query", action="store_true",
550  help="queries the definitions related to the samples")
551  ActionGroup.add_argument("--defname", action="store_true",
552  help="prints the name of the definitions related to the samples")
553  ActionGroup.add_argument("--create", action="store_true",
554  help="creates one definition per sample (use --defname to see their names)")
555  ActionGroup.add_argument("--delete", action="store_true",
556  help="attempts to remove one definition per sample")
557 
558  GeneralOptGroup = parser.add_argument_group(title="General options")
559  GeneralOptGroup.add_argument("--test", action="store_true",
560  help="tests the connection to SAM and exits")
561  GeneralOptGroup.add_argument("--force", "-F", action="store_true",
562  help="skips safety checks of some operations")
563  GeneralOptGroup.add_argument("--fake", "--dryrun", "-n", action="store_true",
564  help="does not perform actual creation and query actions")
565  GeneralOptGroup.add_argument("--debug", action="store_true",
566  help="enable verbose debugging output")
567  GeneralOptGroup.add_argument("--version", "-V", action="version",
568  version=f"%(prog)s v{__version__} ({time.asctime(__date__)})",
569  help="prints the version number")
570 
571  args = parser.parse_args()
572 
573  logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO)
574  if args.stage is None: args.stage = DefaultStages
575  if args.stream:
576  args.stream = [ None if s == "any" else s for s in args.stream ]
577 
578  try: samweb = SAMWebClient()
579  except samexcpt.Error as e:
580  if args.test:
581  print("Failed to connect to SAM.");
582  else:
583  logging.error("Failed to connect to SAM: %s", e)
584  sys.exit(1)
585  # try ... except
586  if args.test:
587  try: print(samweb.serverInfo())
588  except samexcpt.Error as e:
589  logging.error("Test connection to SAM failed: %s", e)
590  sys.exit(1)
591  print("\nConnection test succeeded.")
592  sys.exit(0)
593  # if test
594 
595  if not AllowAllRuns and not args.runs:
596  logging.error("At least one run MUST be specified.")
597  sys.exit(1)
598  #
599 
600  iterateSamples = SampleBrowser(samweb)
601  processInfo = SampleProcessClass(samweb,
602  create=args.create, query=args.query, printDefs=args.defname,
603  describe=args.describe, check=args.check, delete=args.delete,
604  fake=args.fake, force=args.force, prependUser=not args.globalDef
605  )
606 
607  baseSampleInfo = SampleInfo(
608  run=collapseList(args.runs if args.runs else None),
609  stage=collapseList(args.stage),
610  stream=collapseList(args.stream), # None means any
611  projectVersion=collapseList(args.prjversion), # None for autodetect
612  )
613 
614  for sampleInfo in iterateSamples(baseSampleInfo):
615  processInfo(sampleInfo)
616 
617  sys.exit(0)
618 # main
do one_file $F done echo for F in find $TOP name CMakeLists txt print
Framework includes.
S join(S const &sep, Coll const &s)
Returns a concatenation of strings in s separated by sep.
list
Definition: file_to_url.sh:28