All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
larbatch_utilities.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 ######################################################################
3 #
4 # Name: larbatch_utilities.py
5 #
6 # Purpose: This module contains low level utilities that are used in
7 # either modules project_utilities or larbatch_posix.
8 #
9 # Created: 13-Jun-2016 Herbert Greenlee
10 #
11 # The following functions are provided as interfaces to ifdh. These
12 # functions are equipped with authentication checking, timeouts and
13 # other protections.
14 #
15 # ifdh_cp - Interface for "ifdh cp."
16 # ifdh_ls - Interface for "ifdh ls."
17 # ifdh_ll - Interface for "ifdh ll."
18 # ifdh_mkdir - Interface for "ifdh mkdir."
19 # ifdh_rmdir - Interface for "ifdh rmdir."
20 # ifdh_mv - Interface for "ifdh mv."
21 # ifdh_rm - Interface for "ifdh rm."
22 # ifdh_chmod - Interface for "ifdh chmod."
23 #
24 # The following functions are provided as interfaces to posix tools
25 # with additional protections or timeouts.
26 #
27 # posix_cp - Copy file with timeout.
28 #
29 # Authentication functions.
30 #
31 # test_ticket - Raise an exception of user does not have a valid kerberos ticket.
32 # get_kca - Get a kca certificate.
33 # get_proxy - Get a grid proxy.
34 # test_kca - Get a kca certificate if necessary.
35 # text_proxy - Get a grid proxy if necessary.
36 # get_experiment - Get standard experiment name.
37 # get_user - Get authenticated user.
38 # get_prouser - Get production user.
39 # get_role - Get VO role.
40 #
41 # SAM functions.
42 #
43 # dimensions - Return sam query dimensions for stage.
44 # get_sam_metadata - Return sam metadata fcl parameters for stage.
45 # get_bluearc_server - Sam fictitious server for bluearc.
46 # get_dcache_server - Sam fictitious server for dCache.
47 # get_dropbox - Return dropbox based on sam metadata.
48 #
49 # Other functions.
50 #
51 # get_ups_products - Top level ups products.
52 # get_setup_script_path - Full path of experiment setup script.
53 # wait_for_subprocess - For use with subprocesses with timeouts.
54 # dcache_server - Return dCache server.
55 # dcache_path - Convert dCache local path to path on server.
56 # xrootd_server_port - Return xrootd server and port (as <server>:<port>).
57 # xrootd_uri - Convert dCache path to xrootd uri.
58 # gridftp_uri - Convert dCache path to gridftp uri.
59 # srm_uri - Convert dCache path to srm uri.
60 # nfs_server - Node name of a computer in which /pnfs filesystem is nfs-mounted.
61 # parse_mode - Parse the ten-character file mode string ("ls -l").
62 # check_running - Check for running project.py submission process.
63 # convert_str - Accepting unicode or bytes as input, convert to default python str.
64 # convert_bytes - Accepting unicode or bytes as input, convert to bytes.
65 # test_jobsub - Test whether jobsub_client is set up.
66 #
67 ######################################################################
68 
69 from __future__ import absolute_import
70 from __future__ import print_function
71 import sys, os
72 import stat
73 import subprocess
74 import getpass
75 import threading
76 try:
77  import queue
78 except ImportError:
79  import Queue as queue
80 from project_modules.ifdherror import IFDHError
81 
82 # Global variables.
83 
84 ticket_ok = False
85 kca_ok = False
86 proxy_ok = False
87 kca_user = ''
88 jobsub_ok = False
89 
90 # Copy file using ifdh, with timeout.
91 
92 def ifdh_cp(source, destination):
93 
94  # Get proxy.
95 
96  test_proxy()
97 
98  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
99  # are not defined (they confuse ifdh, or rather the underlying tools).
100 
101  save_vars = {}
102  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
103  if var in os.environ:
104  save_vars[var] = os.environ[var]
105  del os.environ[var]
106 
107  # Do copy.
108 
109  cmd = ['ifdh', 'cp', source, destination]
110  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
111 
112  q = queue.Queue()
113  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
114  thread.start()
115  thread.join(timeout=31000000)
116  if thread.is_alive():
117  print('Terminating subprocess.')
118  jobinfo.terminate()
119  thread.join()
120  rc = q.get()
121  jobout = convert_str(q.get())
122  joberr = convert_str(q.get())
123  if rc != 0:
124  for var in list(save_vars.keys()):
125  os.environ[var] = save_vars[var]
126  raise IFDHError(cmd, rc, jobout, joberr)
127 
128  # Restore environment variables.
129 
130  for var in list(save_vars.keys()):
131  os.environ[var] = save_vars[var]
132 
133 
134 # Ifdh ls, with timeout.
135 # Return value is list of lines returned by "ifdh ls" command.
136 
137 def ifdh_ls(path, depth):
138 
139  # Get proxy.
140 
141  test_proxy()
142 
143  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
144  # are not defined (they confuse ifdh, or rather the underlying tools).
145 
146  save_vars = {}
147  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
148  if var in os.environ:
149  save_vars[var] = os.environ[var]
150  del os.environ[var]
151 
152  # Do listing.
153 
154  cmd = ['ifdh', 'ls', path, '%d' % depth]
155  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
156 
157  q = queue.Queue()
158  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
159  thread.start()
160  thread.join(timeout=600)
161  if thread.is_alive():
162  print('Terminating subprocess.')
163  jobinfo.terminate()
164  thread.join()
165  rc = q.get()
166  jobout = convert_str(q.get())
167  joberr = convert_str(q.get())
168  if rc != 0:
169  for var in list(save_vars.keys()):
170  os.environ[var] = save_vars[var]
171  raise IFDHError(cmd, rc, jobout, joberr)
172 
173  # Restore environment variables.
174 
175  for var in list(save_vars.keys()):
176  os.environ[var] = save_vars[var]
177 
178  # Done.
179 
180  return jobout.splitlines()
181 
182 
183 # Ifdh ll, with timeout.
184 # Return value is list of lines returned by "ifdh ls" command.
185 
186 def ifdh_ll(path, depth):
187 
188  # Get proxy.
189 
190  test_proxy()
191 
192  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
193  # are not defined (they confuse ifdh, or rather the underlying tools).
194 
195  save_vars = {}
196  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
197  if var in os.environ:
198  save_vars[var] = os.environ[var]
199  del os.environ[var]
200 
201  # Do listing.
202 
203  cmd = ['ifdh', 'll', path, '%d' % depth]
204  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
205 
206  q = queue.Queue()
207  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
208  thread.start()
209  thread.join(timeout=60)
210  if thread.is_alive():
211  print('Terminating subprocess.')
212  jobinfo.terminate()
213  thread.join()
214  rc = q.get()
215  jobout = convert_str(q.get())
216  joberr = convert_str(q.get())
217  if rc != 0:
218  for var in list(save_vars.keys()):
219  os.environ[var] = save_vars[var]
220  raise IFDHError(cmd, rc, jobout, joberr)
221 
222  # Restore environment variables.
223 
224  for var in list(save_vars.keys()):
225  os.environ[var] = save_vars[var]
226 
227  # Done.
228 
229  return jobout.splitlines()
230 
231 
232 # Ifdh mkdir, with timeout.
233 
234 def ifdh_mkdir(path):
235 
236  # Get proxy.
237 
238  test_proxy()
239 
240  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
241  # are not defined (they confuse ifdh, or rather the underlying tools).
242 
243  save_vars = {}
244  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
245  if var in os.environ:
246  save_vars[var] = os.environ[var]
247  del os.environ[var]
248 
249  # Do mkdir.
250 
251  cmd = ['ifdh', 'mkdir', path]
252  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
253 
254  q = queue.Queue()
255  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
256  thread.start()
257  thread.join(timeout=60)
258  if thread.is_alive():
259  print('Terminating subprocess.')
260  jobinfo.terminate()
261  thread.join()
262  rc = q.get()
263  jobout = convert_str(q.get())
264  joberr = convert_str(q.get())
265  if rc != 0:
266  for var in list(save_vars.keys()):
267  os.environ[var] = save_vars[var]
268  raise IFDHError(cmd, rc, jobout, joberr)
269 
270  # Restore environment variables.
271 
272  for var in list(save_vars.keys()):
273  os.environ[var] = save_vars[var]
274 
275  # Done.
276 
277  return
278 
279 
280 # Ifdh rmdir, with timeout.
281 
282 def ifdh_rmdir(path):
283 
284  # Get proxy.
285 
286  test_proxy()
287 
288  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
289  # are not defined (they confuse ifdh, or rather the underlying tools).
290 
291  save_vars = {}
292  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
293  if var in os.environ:
294  save_vars[var] = os.environ[var]
295  del os.environ[var]
296 
297  # Do rmdir.
298 
299  cmd = ['ifdh', 'rmdir', path]
300  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
301 
302  q = queue.Queue()
303  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
304  thread.start()
305  thread.join(timeout=60)
306  if thread.is_alive():
307  print('Terminating subprocess.')
308  jobinfo.terminate()
309  thread.join()
310  rc = q.get()
311  jobout = convert_str(q.get())
312  joberr = convert_str(q.get())
313  if rc != 0:
314  for var in list(save_vars.keys()):
315  os.environ[var] = save_vars[var]
316  raise IFDHError(cmd, rc, jobout, joberr)
317 
318  # Restore environment variables.
319 
320  for var in list(save_vars.keys()):
321  os.environ[var] = save_vars[var]
322 
323  # Done.
324 
325  return
326 
327 
328 # Ifdh chmod, with timeout.
329 
330 def ifdh_chmod(path, mode):
331 
332  # Get proxy.
333 
334  test_proxy()
335 
336  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
337  # are not defined (they confuse ifdh, or rather the underlying tools).
338 
339  save_vars = {}
340  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
341  if var in os.environ:
342  save_vars[var] = os.environ[var]
343  del os.environ[var]
344 
345  # Do chmod.
346 
347  cmd = ['ifdh', 'chmod', '%o' % mode, path]
348  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
349 
350  q = queue.Queue()
351  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
352  thread.start()
353  thread.join(timeout=60)
354  if thread.is_alive():
355  print('Terminating subprocess.')
356  jobinfo.terminate()
357  thread.join()
358  rc = q.get()
359  jobout = convert_str(q.get())
360  joberr = convert_str(q.get())
361  if rc != 0:
362  print('Warning: ifdh chmod failed for path %s' % path)
363 
364  # Restore environment variables.
365 
366  for var in list(save_vars.keys()):
367  os.environ[var] = save_vars[var]
368 
369  # Done.
370 
371  return
372 
373 
374 # Ifdh mv, with timeout.
375 
376 def ifdh_mv(src, dest):
377 
378  # Get proxy.
379 
380  test_proxy()
381 
382  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
383  # are not defined (they confuse ifdh, or rather the underlying tools).
384 
385  save_vars = {}
386  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
387  if var in os.environ:
388  save_vars[var] = os.environ[var]
389  del os.environ[var]
390 
391  # Do rename.
392 
393  cmd = ['ifdh', 'mv', src, dest]
394  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
395 
396  q = queue.Queue()
397  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
398  thread.start()
399  thread.join(timeout=60)
400  if thread.is_alive():
401  print('Terminating subprocess.')
402  jobinfo.terminate()
403  thread.join()
404  rc = q.get()
405  jobout = convert_str(q.get())
406  joberr = convert_str(q.get())
407  if rc != 0:
408  for var in list(save_vars.keys()):
409  os.environ[var] = save_vars[var]
410  raise IFDHError(cmd, rc, jobout, joberr)
411 
412  # Restore environment variables.
413 
414  for var in list(save_vars.keys()):
415  os.environ[var] = save_vars[var]
416 
417  # Done.
418 
419  return
420 
421 
422 # Ifdh rm, with timeout.
423 
424 def ifdh_rm(path):
425 
426  # Get proxy.
427 
428  test_proxy()
429 
430  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
431  # are not defined (they confuse ifdh, or rather the underlying tools).
432 
433  save_vars = {}
434  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
435  if var in os.environ:
436  save_vars[var] = os.environ[var]
437  del os.environ[var]
438 
439  # Do delete.
440 
441  cmd = ['ifdh', 'rm', path]
442  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
443 
444  q = queue.Queue()
445  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
446  thread.start()
447  thread.join(timeout=60)
448  if thread.is_alive():
449  print('Terminating subprocess.')
450  jobinfo.terminate()
451  thread.join()
452  rc = q.get()
453  jobout = convert_str(q.get())
454  joberr = convert_str(q.get())
455  if rc != 0:
456  for var in list(save_vars.keys()):
457  os.environ[var] = save_vars[var]
458  raise IFDHError(cmd, rc, jobout, joberr)
459 
460  # Restore environment variables.
461 
462  for var in list(save_vars.keys()):
463  os.environ[var] = save_vars[var]
464 
465  # Done.
466 
467  return
468 
469 
470 # Posix copy with timeout.
471 
472 def posix_cp(source, destination):
473 
474  cmd = ['cp', source, destination]
475 
476  # Fork buffer process.
477 
478  buffer_pid = os.fork()
479  if buffer_pid == 0:
480 
481  # In child process.
482  # Launch cp subprocess.
483 
484  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
485 
486  q = queue.Queue()
487  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
488  thread.start()
489  thread.join(timeout=600)
490  if thread.is_alive():
491 
492  # Subprocess did not finish (may be hanging and unkillable).
493  # Try to kill the subprocess and exit process.
494  # Unkillable process will become detached.
495 
496  print('Terminating subprocess.')
497  jobinfo.kill()
498  os._exit(1)
499 
500  else:
501 
502  # Subprocess finished normally.
503 
504  rc = q.get()
505  jobout = convert_str(q.get())
506  joberr = convert_str(q.get())
507  os._exit(rc)
508 
509  else:
510 
511  # In parent process.
512  # Wait for buffer subprocess to finish.
513 
514  buffer_result = os.waitpid(buffer_pid, 0)
515  rc = buffer_result[1]/256
516  if rc != 0:
517  raise IFDHError(cmd, rc, '', '')
518 
519  # Done.
520 
521  return
522 
523 
524 # Function to wait for a subprocess to finish and fetch return code,
525 # standard output, and standard error.
526 # Call this function like this:
527 #
528 # q = Queue.Queue()
529 # jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
530 # wait_for_subprocess(jobinfo, q, input)
531 # rc = q.get() # Return code.
532 # jobout = q.get() # Standard output
533 # joberr = q.get() # Standard error
534 
535 
536 def wait_for_subprocess(jobinfo, q, input=None):
537  jobout, joberr = jobinfo.communicate(input)
538  rc = jobinfo.poll()
539  q.put(rc)
540  q.put(jobout)
541  q.put(joberr)
542  return
543 
544 
545 # Test whether user has a valid kerberos ticket. Raise exception if no.
546 
548  global ticket_ok
549  if not ticket_ok:
550  ok = subprocess.call(['klist', '-s'], stdout=-1, stderr=-1)
551  if ok != 0:
552  raise RuntimeError('Please get a kerberos ticket.')
553  ticket_ok = True
554  return ticket_ok
555 
556 
557 # Get kca certificate.
558 
559 def get_kca():
560 
561  global kca_ok
562  kca_ok = False
563 
564  # First, make sure we have a kerberos ticket.
565 
566  krb_ok = test_ticket()
567  if krb_ok:
568 
569  # Get kca certificate.
570 
571  kca_ok = False
572  try:
573  subprocess.check_call(['kx509'], stdout=-1, stderr=-1)
574  kca_ok = True
575  except:
576  pass
577 
578  # Done
579 
580  return kca_ok
581 
582 
583 # Get grid proxy.
584 # This implementation should be good enough for experiments in the fermilab VO.
585 # Experiments not in the fermilab VO (lbne/dune) should override this function
586 # in experiment_utilities.py.
587 
588 def get_proxy():
589 
590  global proxy_ok
591  proxy_ok = False
592 
593  # Make sure we have a valid certificate.
594 
595  test_kca()
596 
597  # Get proxy using either specified cert+key or default cert.
598 
599  if 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
600  cmd=['voms-proxy-init',
601  '-rfc',
602  '-cert', os.environ['X509_USER_CERT'],
603  '-key', os.environ['X509_USER_KEY'],
604  '-voms', 'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
605  try:
606  subprocess.check_call(cmd, stdout=-1, stderr=-1)
607  proxy_ok = True
608  except:
609  pass
610  pass
611  else:
612  cmd=['voms-proxy-init',
613  '-noregen',
614  '-rfc',
615  '-voms',
616  'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
617  try:
618  subprocess.check_call(cmd, stdout=-1, stderr=-1)
619  proxy_ok = True
620  except:
621  pass
622 
623  # Done
624 
625  return proxy_ok
626 
627 
628 # Test whether user has a valid kca certificate. If not, try to get a new one.
629 
630 def test_kca():
631  global kca_ok
632  if not kca_ok:
633  try:
634  if 'X509_USER_PROXY' in os.environ:
635  subprocess.check_call(['voms-proxy-info',
636  '-file', os.environ['X509_USER_PROXY'],
637  '-exists'], stdout=-1, stderr=-1)
638  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
639  subprocess.check_call(['voms-proxy-info',
640  '-file', os.environ['X509_USER_CERT'],
641  '-exists'], stdout=-1, stderr=-1)
642  else:
643  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
644 
645  # Workaround jobsub bug by setting environment variable X509_USER_PROXY to
646  # point to the default location of the kca certificate.
647 
648  x509_path = convert_str(subprocess.check_output(['voms-proxy-info', '-path'], stderr=-1))
649  os.environ['X509_USER_PROXY'] = x509_path.strip()
650 
651  kca_ok = True
652  except:
653  pass
654 
655  # If at this point we don't have a kca certificate, try to get one.
656 
657  if not kca_ok:
658  get_kca()
659 
660  # Final checkout.
661 
662  if not kca_ok:
663  try:
664  if 'X509_USER_PROXY' in os.environ:
665  subprocess.check_call(['voms-proxy-info',
666  '-file', os.environ['X509_USER_PROXY'],
667  '-exists'], stdout=-1, stderr=-1)
668  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
669  subprocess.check_call(['voms-proxy-info',
670  '-file', os.environ['X509_USER_CERT'],
671  '-exists'], stdout=-1, stderr=-1)
672  else:
673  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
674  kca_ok = True
675  except:
676  raise RuntimeError('Please get a kca certificate.')
677  return kca_ok
678 
679 
680 # Test whether user has a valid grid proxy. If not, try to get a new one.
681 
683  global proxy_ok
684  if not proxy_ok:
685  try:
686  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
687  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
688  proxy_ok = True
689  except:
690  pass
691 
692  # If at this point we don't have a grid proxy, try to get one.
693 
694  if not proxy_ok:
695  get_proxy()
696 
697  # Final checkout.
698 
699  if not proxy_ok:
700  try:
701  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
702  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
703  proxy_ok = True
704  except:
705  raise RuntimeError('Please get a grid proxy.')
706  return proxy_ok
707 
708 # Test whether jobsub_client has been set up.
709 
711  global jobsub_ok
712  if not jobsub_ok:
713 
714  # Look for command jobsub_submit on execution path.
715 
716  try:
717  jobinfo = subprocess.Popen(['which', 'jobsub_submit'],
718  stdout=subprocess.PIPE,
719  stderr=subprocess.PIPE)
720  jobout, joberr = jobinfo.communicate()
721  jobout = convert_str(jobout)
722  joberr = convert_str(joberr)
723  jobsub_path = jobout.splitlines()[0].strip()
724  if jobsub_path != '':
725  jobsub_ok = True
726  except:
727  pass
728 
729  if not jobsub_ok:
730  print('Please set up jobsub_client')
731  sys.exit(1)
732 
733  return jobsub_ok
734 
735 # Return dCache server.
736 
738  return "fndca1.fnal.gov"
739 
740 
741 # Convert a local pnfs path to the path on the dCache server.
742 # Return the input path unchanged if it isn't on dCache.
743 
744 def dcache_path(path):
745  if path.startswith('/pnfs/') and not path.startswith('/pnfs/fnal.gov/usr/'):
746  return '/pnfs/fnal.gov/usr/' + path[6:]
747 
748 
749 # Return xrootd server and port.
750 
752  return dcache_server() + ':1094'
753 
754 
755 # Convert a pnfs path to xrootd uri.
756 # Return the input path unchanged if it isn't on dCache.
757 
758 def xrootd_uri(path):
759  if path.startswith('/pnfs/'):
760  return 'root://' + xrootd_server_port() + dcache_path(path)
761  else:
762  return path
763 
764 
765 # Convert a pnfs path to gridftp uri.
766 # Return the input path unchanged if it isn't on dCache.
767 
768 def gridftp_uri(path):
769  if path.startswith('/pnfs/'):
770  return 'gsiftp://' + dcache_server() + dcache_path(path)
771  else:
772  return path
773 
774 
775 # Convert a pnfs path to srm uri.
776 # Return the input path unchanged if it isn't on dCache.
777 
778 def srm_uri(path):
779  if path.startswith('/pnfs/'):
780  return 'srm://fndca1.fnal.gov:8443/srm/managerv2?SFN=/pnfs/fnal.gov/usr/' + path[6:]
781  else:
782  return path
783 
784 
785 # Return the name of a computer with login access that has the /pnfs
786 # filesystem nfs-mounted. This function makes use of the $EXPERIMENT
787 # environment variable (as does ifdh), which must be set.
788 
790  return '%sgpvm01.fnal.gov' % os.environ['EXPERIMENT']
791 
792 
793 # Parse the ten-character file mode string as returned by "ls -l"
794 # and return mode bit masek.
795 
796 def parse_mode(mode_str):
797 
798  mode = 0
799 
800  # File type.
801 
802  if mode_str[0] == 'b':
803  mode += stat.S_IFBLK
804  elif mode_str[0] == 'c':
805  mode += stat.S_IFCHR
806  elif mode_str[0] == 'd':
807  mode += stat.S_IFDIR
808  elif mode_str[0] == 'l':
809  mode += stat.S_IFLNK
810  elif mode_str[0] == 'p':
811  mode += stat.S_IFIFO
812  elif mode_str[0] == 's':
813  mode += stat.S_IFSOCK
814  elif mode_str[0] == '-':
815  mode += stat.S_IFREG
816 
817  # File permissions.
818 
819  # User triad (includes setuid).
820 
821  if mode_str[1] == 'r':
822  mode += stat.S_IRUSR
823  if mode_str[2] == 'w':
824  mode += stat.S_IWUSR
825  if mode_str[3] == 'x':
826  mode += stat.S_IXUSR
827  elif mode_str[3] == 's':
828  mode += stat.S_ISUID
829  mode += stat.S_IXUSR
830  elif mode_str[3] == 'S':
831  mode += stat.S_ISUID
832 
833  # Group triad (includes setgid).
834 
835  if mode_str[4] == 'r':
836  mode += stat.S_IRGRP
837  if mode_str[5] == 'w':
838  mode += stat.S_IWGRP
839  if mode_str[6] == 'x':
840  mode += stat.S_IXGRP
841  elif mode_str[6] == 's':
842  mode += stat.S_ISGID
843  mode += stat.S_IXGRP
844  elif mode_str[6] == 'S':
845  mode += stat.S_ISGID
846 
847  # World triad (includes sticky bit).
848 
849  if mode_str[7] == 'r':
850  mode += stat.S_IROTH
851  if mode_str[8] == 'w':
852  mode += stat.S_IWOTH
853  if mode_str[9] == 'x':
854  mode += stat.S_IXOTH
855  elif mode_str[9] == 't':
856  mode += stat.S_ISVTX
857  mode += stat.S_IXOTH
858  elif mode_str[9] == 'T':
859  mode += stat.S_ISVTX
860 
861  # Done
862 
863  return mode
864 
865 # Function to return the current experiment.
866 # The following places for obtaining this information are
867 # tried (in order):
868 #
869 # 1. Environment variable $EXPERIMENT.
870 # 2. Environment variable $SAM_EXPERIMENT.
871 # 3. Hostname (up to "gpvm").
872 #
873 # Raise an exception if none of the above methods works.
874 #
875 
877 
878  exp = ''
879  for ev in ('EXPERIMENT', 'SAM_EXPERIMENT'):
880  if ev in os.environ:
881  exp = os.environ[ev]
882  break
883 
884  if not exp:
885  hostname = socket.gethostname()
886  n = hostname.find('gpvm')
887  if n > 0:
888  exp = hostname[:n]
889 
890  if not exp:
891  raise RuntimeError('Unable to determine experiment.')
892 
893  return exp
894 
895 
896 # Get role (normally 'Analysis' or 'Production').
897 
898 def get_role():
899 
900  # If environment variable ROLE is defined, use that. Otherwise, make
901  # an educated guess based on user name.
902 
903  result = 'Analysis' # Default role.
904 
905  # Check environment variable $ROLE.
906 
907  if 'ROLE' in os.environ:
908  result = os.environ['ROLE']
909 
910  # Otherwise, check user.
911 
912  else:
913  prouser = get_experiment() + 'pro'
914  user = getpass.getuser()
915  if user == prouser:
916  result = 'Production'
917 
918  return result
919 
920 
921 # Function to return a comma-separated list of run-time top level ups products.
922 
924  return get_experiment() + 'code'
925 
926 
927 # Function to return path of experiment bash setup script that is valid
928 # on the node where this script is being executed.
929 # This function should be overridden in <experiment>_utilities.py.
930 
932  raise RuntimeError('Function get_setup_script_path not implemented.')
933 
934 
935 # Function to return dimension string for project, stage.
936 # This function should be overridden in experiment_utilities.py
937 
938 def dimensions(project, stage, ana=False):
939  raise RuntimeError('Function dimensions not implemented.')
940 
941 
942 # Function to return dimension string for project, stage, including data stream.
943 
944 def dimensions_datastream(project, stage, ana=False, index=0):
945 
946  # Default same as no data stream.
947 
948  dim = dimensions(project, stage, ana=ana)
949 
950  # Append data stream dimension, if appropriate.
951 
952  if ana:
953  if stage.ana_data_stream != None and len(stage.ana_data_stream) > 0:
954  dim1 = '( data_stream %s and %s )' % (stage.ana_data_stream[index], dim)
955  dim = dim1
956  else:
957  if stage.data_stream != None and len(stage.data_stream) > 0:
958  dim1 = '( data_stream %s and %s )' % (stage.data_stream[index], dim)
959  dim = dim1
960 
961  # Done.
962 
963  return dim
964 
965 
966 # Function to return the production user name
967 
969  return get_experiment() + 'pro'
970 
971 
972 # Function to return the fictitious disk server node
973 # name used by sam for bluearc disks.
974 
976  return get_experiment() + 'data:'
977 
978 
979 # Function to return the fictitious disk server node
980 # name used by sam for dCache disks.
981 
983  return 'fnal-dcache:'
984 
985 
986 # Function to determine dropbox directory based on sam metadata.
987 # Raise an exception if the specified file doesn't have metadata.
988 # This function should be overridden in <experiment>_utilities module.
989 
990 def get_dropbox(filename):
991  raise RuntimeError('Function get_dropbox not implemented.')
992 
993 
994 # Function to return string containing sam metadata in the form
995 # of an fcl configuraiton. It is intended that this function
996 # may be overridden in experiment_utilities.py.
997 
998 def get_sam_metadata(project, stage):
999  result = ''
1000  return result
1001 
1002 
1003 # Get authenticated user (from kerberos ticket, not $USER).
1004 
1005 def get_user():
1006 
1007  # See if we have a cached value for user.
1008 
1009  global kca_user
1010  if kca_user != '':
1011  return kca_user
1012 
1013  # Return production user name if Role is Production
1014 
1015  if get_role() == 'Production':
1016  return get_prouser()
1017 
1018  else:
1019 
1020  # First make sure we have a kca certificate (raise exception if not).
1021 
1022  test_kca()
1023 
1024  # Return user name from certificate if Role is Analysis
1025 
1026  subject = ''
1027  if 'X509_USER_PROXY' in os.environ:
1028  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1029  '-file', os.environ['X509_USER_PROXY'],
1030  '-subject'], stderr=-1))
1031  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
1032  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1033  '-file', os.environ['X509_USER_CERT'],
1034  '-subject'], stderr=-1))
1035  else:
1036  subject = convert_str(subprocess.check_output(['voms-proxy-info', '-subject'],
1037  stderr=-1))
1038 
1039  # Get the last non-numeric CN
1040 
1041  cn = ''
1042  while cn == '':
1043  n = subject.rfind('/CN=')
1044  if n >= 0:
1045  cn = subject[n+4:]
1046  if cn.strip().isdigit():
1047  cn = ''
1048  subject = subject[:n]
1049  else:
1050  break
1051 
1052  # Truncate everything after the first '/'.
1053 
1054  n = cn.find('/')
1055  if n >= 0:
1056  cn = cn[:n]
1057 
1058  # Truncate everything after the first newline.
1059 
1060  n = cn.find('\n')
1061  if n >= 0:
1062  cn = cn[:n]
1063 
1064  # Truncate everything before the first ":" (UID:).
1065 
1066  n = cn.find(':')
1067  if n >= 0:
1068  cn = cn[n+1:]
1069 
1070  # Done (maybe).
1071 
1072  if cn != '':
1073  return cn
1074 
1075  # Something went wrong...
1076 
1077  raise RuntimeError('Unable to determine authenticated user.')
1078 
1079 
1080 # Function to check whether there is a running project.py process on this node
1081 # with the specified xml file and stage.
1082 #
1083 # This function works by checking the contents of /proc. Each process is checked
1084 # for the following properties.
1085 #
1086 # 1. Owned by same uid as this process.
1087 # 2. Command line.
1088 # a) project.py
1089 # b) Matching --xml option (exact match).
1090 # c) Matching --stage option (exact match).
1091 # d) --submit or --makeup option.
1092 #
1093 # Arguments xml and stage should be strings, and must match exactly command
1094 # line arguments.
1095 
1096 def check_running(xmlname, stagename):
1097 
1098  result = 0
1099 
1100  # Look over pids in /proc.
1101 
1102  for pid in os.listdir('/proc'):
1103  if pid.isdigit() and int(pid) != os.getpid():
1104  procfile = os.path.join('/proc', pid)
1105  try:
1106  pstat = os.stat(procfile)
1107 
1108  # Only look at processes that match this process uid.
1109 
1110  if pstat.st_uid == os.getuid():
1111 
1112  # Get command line.
1113 
1114  cmdfile = os.path.join('/proc', pid, 'cmdline')
1115  cmd = open(cmdfile).read()
1116  words = cmd.split('\0')
1117 
1118  # Check options.
1119 
1120  project = 0
1121  xml = 0
1122  stage = 0
1123  xmlmatch = 0
1124  stagematch = 0
1125  submit = 0
1126  makeup = 0
1127 
1128  for word in words:
1129 
1130  # Check command.
1131 
1132  if word.endswith('project.py'):
1133  project = 1
1134 
1135  # Check arguments.
1136 
1137  if xml == 1 and word == xmlname:
1138  xmlmatch = 1
1139  elif stage == 1 and word == stagename:
1140  stagematch = 1
1141 
1142  xml = 0
1143  stage = 0
1144 
1145  # Check options.
1146 
1147  if word == '--xml':
1148  xml = 1
1149  elif word == '--stage':
1150  stage = 1
1151  elif word == '--submit':
1152  submit = 1
1153  elif word == '--makeup':
1154  makeup = 1
1155 
1156  if project != 0 and submit+makeup != 0 and xmlmatch != 0 and stagematch != 0:
1157  result = 1
1158  break
1159 
1160  except:
1161  pass
1162 
1163  # Done.
1164 
1165  return result
1166 
1167 
1168 # Convert bytes or unicode string to default python str type.
1169 # Works on python 2 and python 3.
1170 
1172 
1173  result = ''
1174 
1175  if type(s) == type(''):
1176 
1177  # Already a default str.
1178  # Just return the original.
1179 
1180  result = s
1181 
1182  elif type(s) == type(u''):
1183 
1184  # Unicode and not str.
1185  # Convert to bytes.
1186 
1187  result = s.encode()
1188 
1189  elif type(s) == type(b''):
1190 
1191  # Bytes and not str.
1192  # Convert to unicode.
1193 
1194  result = s.decode()
1195 
1196  else:
1197 
1198  # Last resort, use standard str conversion.
1199 
1200  result = str(s)
1201 
1202  return result
1203 
1204 
1205 # Convert bytes or unicode string to bytes.
1206 # Works on python 2 and python 3.
1207 
1209 
1210  result = ''
1211 
1212  if type(s) == type(b''):
1213 
1214  # Already bytes.
1215  # Return the original.
1216 
1217  result = s
1218 
1219  elif type(s) == type(u''):
1220 
1221  # Unicode to bytes.
1222 
1223  result = s.encode()
1224 
1225  else:
1226 
1227  # Anything else, just return the original.
1228 
1229  result = s
1230 
1231  return result
1232 
1233 
1234 # Import experiment-specific utilities. In this imported module, one can
1235 # override any function or symbol defined above, or add new ones.
1236 
1237 from experiment_utilities import *
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
open(RACETRACK) or die("Could not open file $RACETRACK for writing")