All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Functions | Variables
python.larbatch_utilities Namespace Reference

Functions

def ifdh_cp
 
def ifdh_ls
 
def ifdh_ll
 
def ifdh_mkdir
 
def ifdh_rmdir
 
def ifdh_chmod
 
def ifdh_mv
 
def ifdh_rm
 
def posix_cp
 
def wait_for_subprocess
 
def test_ticket
 
def get_kca
 
def get_proxy
 
def test_kca
 
def test_proxy
 
def test_jobsub
 
def dcache_server
 
def dcache_path
 
def xrootd_server_port
 
def xrootd_uri
 
def gridftp_uri
 
def srm_uri
 
def nfs_server
 
def parse_mode
 
def get_experiment
 
def get_role
 
def get_ups_products
 
def get_setup_script_path
 
def dimensions
 
def dimensions_datastream
 
def get_prouser
 
def get_bluearc_server
 
def get_dcache_server
 
def get_dropbox
 
def get_sam_metadata
 
def get_user
 
def check_running
 
def convert_str
 
def convert_bytes
 

Variables

 ticket_ok = False
 
 kca_ok = False
 
 proxy_ok = False
 
string kca_user = ''
 
 jobsub_ok = False
 

Function Documentation

def python.larbatch_utilities.check_running (   xmlname,
  stagename 
)

Definition at line 1096 of file larbatch_utilities.py.

1097 def check_running(xmlname, stagename):
1098 
1099  result = 0
1100 
1101  # Look over pids in /proc.
1102 
1103  for pid in os.listdir('/proc'):
1104  if pid.isdigit() and int(pid) != os.getpid():
1105  procfile = os.path.join('/proc', pid)
1106  try:
1107  pstat = os.stat(procfile)
1108 
1109  # Only look at processes that match this process uid.
1110 
1111  if pstat.st_uid == os.getuid():
1112 
1113  # Get command line.
1114 
1115  cmdfile = os.path.join('/proc', pid, 'cmdline')
1116  cmd = open(cmdfile).read()
1117  words = cmd.split('\0')
1118 
1119  # Check options.
1120 
1121  project = 0
1122  xml = 0
1123  stage = 0
1124  xmlmatch = 0
1125  stagematch = 0
1126  submit = 0
1127  makeup = 0
1128 
1129  for word in words:
1130 
1131  # Check command.
1132 
1133  if word.endswith('project.py'):
1134  project = 1
1135 
1136  # Check arguments.
1137 
1138  if xml == 1 and word == xmlname:
1139  xmlmatch = 1
1140  elif stage == 1 and word == stagename:
1141  stagematch = 1
1142 
1143  xml = 0
1144  stage = 0
1145 
1146  # Check options.
1147 
1148  if word == '--xml':
1149  xml = 1
1150  elif word == '--stage':
1151  stage = 1
1152  elif word == '--submit':
1153  submit = 1
1154  elif word == '--makeup':
1155  makeup = 1
1156 
1157  if project != 0 and submit+makeup != 0 and xmlmatch != 0 and stagematch != 0:
1158  result = 1
1159  break
1160 
1161  except:
1162  pass
1163 
1164  # Done.
1165 
1166  return result
1167 
1168 
1169 # Convert bytes or unicode string to default python str type.
1170 # Works on python 2 and python 3.
open(RACETRACK) or die("Could not open file $RACETRACK for writing")
def python.larbatch_utilities.convert_bytes (   s)

Definition at line 1208 of file larbatch_utilities.py.

1209 def convert_bytes(s):
1210 
1211  result = ''
1212 
1213  if type(s) == type(b''):
1214 
1215  # Already bytes.
1216  # Return the original.
1217 
1218  result = s
1219 
1220  elif type(s) == type(u''):
1221 
1222  # Unicode to bytes.
1223 
1224  result = s.encode()
1225 
1226  else:
1227 
1228  # Anything else, just return the original.
1229 
1230  result = s
1231 
1232  return result
1233 
1234 
1235 # Import experiment-specific utilities. In this imported module, one can
1236 # override any function or symbol defined above, or add new ones.
def python.larbatch_utilities.convert_str (   s)

Definition at line 1171 of file larbatch_utilities.py.

1172 def convert_str(s):
1173 
1174  result = ''
1175 
1176  if type(s) == type(''):
1177 
1178  # Already a default str.
1179  # Just return the original.
1180 
1181  result = s
1182 
1183  elif type(s) == type(u''):
1184 
1185  # Unicode and not str.
1186  # Convert to bytes.
1187 
1188  result = s.encode()
1189 
1190  elif type(s) == type(b''):
1191 
1192  # Bytes and not str.
1193  # Convert to unicode.
1194 
1195  result = s.decode()
1196 
1197  else:
1198 
1199  # Last resort, use standard str conversion.
1200 
1201  result = str(s)
1202 
1203  return result
1204 
1205 
1206 # Convert bytes or unicode string to bytes.
1207 # Works on python 2 and python 3.
def python.larbatch_utilities.dcache_path (   path)

Definition at line 744 of file larbatch_utilities.py.

745 def dcache_path(path):
746  if path.startswith('/pnfs/') and not path.startswith('/pnfs/fnal.gov/usr/'):
747  return '/pnfs/fnal.gov/usr/' + path[6:]
748 
749 
750 # Return xrootd server and port.
def python.larbatch_utilities.dcache_server ( )

Definition at line 737 of file larbatch_utilities.py.

738 def dcache_server():
739  return "fndca1.fnal.gov"
740 
741 
742 # Convert a local pnfs path to the path on the dCache server.
743 # Return the input path unchanged if it isn't on dCache.
def python.larbatch_utilities.dimensions (   project,
  stage,
  ana = False 
)

Definition at line 938 of file larbatch_utilities.py.

939 def dimensions(project, stage, ana=False):
940  raise RuntimeError('Function dimensions not implemented.')
941 
942 
943 # Function to return dimension string for project, stage, including data stream.
def python.larbatch_utilities.dimensions_datastream (   project,
  stage,
  ana = False,
  index = 0 
)

Definition at line 944 of file larbatch_utilities.py.

945 def dimensions_datastream(project, stage, ana=False, index=0):
946 
947  # Default same as no data stream.
948 
949  dim = dimensions(project, stage, ana=ana)
950 
951  # Append data stream dimension, if appropriate.
952 
953  if ana:
954  if stage.ana_data_stream != None and len(stage.ana_data_stream) > 0:
955  dim1 = '( data_stream %s and %s )' % (stage.ana_data_stream[index], dim)
956  dim = dim1
957  else:
958  if stage.data_stream != None and len(stage.data_stream) > 0:
959  dim1 = '( data_stream %s and %s )' % (stage.data_stream[index], dim)
960  dim = dim1
961 
962  # Done.
963 
964  return dim
965 
966 
967 # Function to return the production user name
def python.larbatch_utilities.get_bluearc_server ( )

Definition at line 975 of file larbatch_utilities.py.

976 def get_bluearc_server():
977  return get_experiment() + 'data:'
978 
979 
980 # Function to return the fictitious disk server node
981 # name used by sam for dCache disks.
def python.larbatch_utilities.get_dcache_server ( )

Definition at line 982 of file larbatch_utilities.py.

983 def get_dcache_server():
984  return 'fnal-dcache:'
985 
986 
987 # Function to determine dropbox directory based on sam metadata.
988 # Raise an exception if the specified file doesn't have metadata.
989 # This function should be overridden in <experiment>_utilities module.
def python.larbatch_utilities.get_dropbox (   filename)

Definition at line 990 of file larbatch_utilities.py.

991 def get_dropbox(filename):
992  raise RuntimeError('Function get_dropbox not implemented.')
993 
994 
995 # Function to return string containing sam metadata in the form
996 # of an fcl configuraiton. It is intended that this function
997 # may be overridden in experiment_utilities.py.
def python.larbatch_utilities.get_experiment ( )

Definition at line 876 of file larbatch_utilities.py.

877 def get_experiment():
878 
879  exp = ''
880  for ev in ('EXPERIMENT', 'SAM_EXPERIMENT'):
881  if ev in os.environ:
882  exp = os.environ[ev]
883  break
884 
885  if not exp:
886  hostname = socket.gethostname()
887  n = hostname.find('gpvm')
888  if n > 0:
889  exp = hostname[:n]
890 
891  if not exp:
892  raise RuntimeError('Unable to determine experiment.')
893 
894  return exp
895 
896 
897 # Get role (normally 'Analysis' or 'Production').
def python.larbatch_utilities.get_kca ( )

Definition at line 559 of file larbatch_utilities.py.

560 def get_kca():
561 
562  global kca_ok
563  kca_ok = False
564 
565  # First, make sure we have a kerberos ticket.
566 
567  krb_ok = test_ticket()
568  if krb_ok:
569 
570  # Get kca certificate.
571 
572  kca_ok = False
573  try:
574  subprocess.check_call(['kx509'], stdout=-1, stderr=-1)
575  kca_ok = True
576  except:
577  pass
578 
579  # Done
580 
581  return kca_ok
582 
583 
584 # Get grid proxy.
585 # This implementation should be good enough for experiments in the fermilab VO.
586 # Experiments not in the fermilab VO (lbne/dune) should override this function
587 # in experiment_utilities.py.
def python.larbatch_utilities.get_prouser ( )

Definition at line 968 of file larbatch_utilities.py.

969 def get_prouser():
970  return get_experiment() + 'pro'
971 
972 
973 # Function to return the fictitious disk server node
974 # name used by sam for bluearc disks.
def python.larbatch_utilities.get_proxy ( )

Definition at line 588 of file larbatch_utilities.py.

589 def get_proxy():
590 
591  global proxy_ok
592  proxy_ok = False
593 
594  # Make sure we have a valid certificate.
595 
596  test_kca()
597 
598  # Get proxy using either specified cert+key or default cert.
599 
600  if 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
601  cmd=['voms-proxy-init',
602  '-rfc',
603  '-cert', os.environ['X509_USER_CERT'],
604  '-key', os.environ['X509_USER_KEY'],
605  '-voms', 'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
606  try:
607  subprocess.check_call(cmd, stdout=-1, stderr=-1)
608  proxy_ok = True
609  except:
610  pass
611  pass
612  else:
613  cmd=['voms-proxy-init',
614  '-noregen',
615  '-rfc',
616  '-voms',
617  'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
618  try:
619  subprocess.check_call(cmd, stdout=-1, stderr=-1)
620  proxy_ok = True
621  except:
622  pass
623 
624  # Done
625 
626  return proxy_ok
627 
628 
629 # Test whether user has a valid kca certificate. If not, try to get a new one.
def python.larbatch_utilities.get_role ( )

Definition at line 898 of file larbatch_utilities.py.

899 def get_role():
900 
901  # If environment variable ROLE is defined, use that. Otherwise, make
902  # an educated guess based on user name.
903 
904  result = 'Analysis' # Default role.
905 
906  # Check environment variable $ROLE.
907 
908  if 'ROLE' in os.environ:
909  result = os.environ['ROLE']
910 
911  # Otherwise, check user.
912 
913  else:
914  prouser = get_experiment() + 'pro'
915  user = getpass.getuser()
916  if user == prouser:
917  result = 'Production'
918 
919  return result
920 
921 
922 # Function to return a comma-separated list of run-time top level ups products.
def python.larbatch_utilities.get_sam_metadata (   project,
  stage 
)

Definition at line 998 of file larbatch_utilities.py.

999 def get_sam_metadata(project, stage):
1000  result = ''
1001  return result
1002 
1003 
1004 # Get authenticated user (from kerberos ticket, not $USER).
def python.larbatch_utilities.get_setup_script_path ( )

Definition at line 931 of file larbatch_utilities.py.

933  raise RuntimeError('Function get_setup_script_path not implemented.')
934 
935 
936 # Function to return dimension string for project, stage.
937 # This function should be overridden in experiment_utilities.py
def python.larbatch_utilities.get_ups_products ( )

Definition at line 923 of file larbatch_utilities.py.

924 def get_ups_products():
925  return get_experiment() + 'code'
926 
927 
928 # Function to return path of experiment bash setup script that is valid
929 # on the node where this script is being executed.
930 # This function should be overridden in <experiment>_utilities.py.
def python.larbatch_utilities.get_user ( )

Definition at line 1005 of file larbatch_utilities.py.

1006 def get_user():
1007 
1008  # See if we have a cached value for user.
1009 
1010  global kca_user
1011  if kca_user != '':
1012  return kca_user
1013 
1014  # Return production user name if Role is Production
1015 
1016  if get_role() == 'Production':
1017  return get_prouser()
1018 
1019  else:
1020 
1021  # First make sure we have a kca certificate (raise exception if not).
1022 
1023  test_kca()
1024 
1025  # Return user name from certificate if Role is Analysis
1026 
1027  subject = ''
1028  if 'X509_USER_PROXY' in os.environ:
1029  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1030  '-file', os.environ['X509_USER_PROXY'],
1031  '-subject'], stderr=-1))
1032  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
1033  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1034  '-file', os.environ['X509_USER_CERT'],
1035  '-subject'], stderr=-1))
1036  else:
1037  subject = convert_str(subprocess.check_output(['voms-proxy-info', '-subject'],
1038  stderr=-1))
1039 
1040  # Get the last non-numeric CN
1041 
1042  cn = ''
1043  while cn == '':
1044  n = subject.rfind('/CN=')
1045  if n >= 0:
1046  cn = subject[n+4:]
1047  if cn.strip().isdigit():
1048  cn = ''
1049  subject = subject[:n]
1050  else:
1051  break
1052 
1053  # Truncate everything after the first '/'.
1054 
1055  n = cn.find('/')
1056  if n >= 0:
1057  cn = cn[:n]
1058 
1059  # Truncate everything after the first newline.
1060 
1061  n = cn.find('\n')
1062  if n >= 0:
1063  cn = cn[:n]
1064 
1065  # Truncate everything before the first ":" (UID:).
1066 
1067  n = cn.find(':')
1068  if n >= 0:
1069  cn = cn[n+1:]
1070 
1071  # Done (maybe).
1072 
1073  if cn != '':
1074  return cn
1075 
1076  # Something went wrong...
1077 
1078  raise RuntimeError('Unable to determine authenticated user.')
1079 
1080 
1081 # Function to check whether there is a running project.py process on this node
1082 # with the specified xml file and stage.
1083 #
1084 # This function works by checking the contents of /proc. Each process is checked
1085 # for the following properties.
1086 #
1087 # 1. Owned by same uid as this process.
1088 # 2. Command line.
1089 # a) project.py
1090 # b) Matching --xml option (exact match).
1091 # c) Matching --stage option (exact match).
1092 # d) --submit or --makeup option.
1093 #
1094 # Arguments xml and stage should be strings, and must match exactly command
1095 # line arguments.
def python.larbatch_utilities.gridftp_uri (   path)

Definition at line 768 of file larbatch_utilities.py.

769 def gridftp_uri(path):
770  if path.startswith('/pnfs/'):
771  return 'gsiftp://' + dcache_server() + dcache_path(path)
772  else:
773  return path
774 
775 
776 # Convert a pnfs path to srm uri.
777 # Return the input path unchanged if it isn't on dCache.
def python.larbatch_utilities.ifdh_chmod (   path,
  mode 
)

Definition at line 330 of file larbatch_utilities.py.

331 def ifdh_chmod(path, mode):
332 
333  # Get proxy.
334 
335  test_proxy()
336 
337  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
338  # are not defined (they confuse ifdh, or rather the underlying tools).
339 
340  save_vars = {}
341  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
342  if var in os.environ:
343  save_vars[var] = os.environ[var]
344  del os.environ[var]
345 
346  # Do chmod.
347 
348  cmd = ['ifdh', 'chmod', '%o' % mode, path]
349  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
350 
351  q = queue.Queue()
352  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
353  thread.start()
354  thread.join(timeout=60)
355  if thread.is_alive():
356  print('Terminating subprocess.')
357  jobinfo.terminate()
358  thread.join()
359  rc = q.get()
360  jobout = convert_str(q.get())
361  joberr = convert_str(q.get())
362  if rc != 0:
363  print('Warning: ifdh chmod failed for path %s' % path)
364 
365  # Restore environment variables.
366 
367  for var in list(save_vars.keys()):
368  os.environ[var] = save_vars[var]
369 
370  # Done.
371 
372  return
373 
374 
375 # Ifdh mv, with timeout.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.ifdh_cp (   source,
  destination 
)

Definition at line 92 of file larbatch_utilities.py.

92 
93 def ifdh_cp(source, destination):
94 
95  # Get proxy.
96 
97  test_proxy()
98 
99  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
100  # are not defined (they confuse ifdh, or rather the underlying tools).
101 
102  save_vars = {}
103  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
104  if var in os.environ:
105  save_vars[var] = os.environ[var]
106  del os.environ[var]
107 
108  # Do copy.
109 
110  cmd = ['ifdh', 'cp', source, destination]
111  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
112 
113  q = queue.Queue()
114  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
115  thread.start()
116  thread.join(timeout=31000000)
117  if thread.is_alive():
118  print('Terminating subprocess.')
119  jobinfo.terminate()
120  thread.join()
121  rc = q.get()
122  jobout = convert_str(q.get())
123  joberr = convert_str(q.get())
124  if rc != 0:
125  for var in list(save_vars.keys()):
126  os.environ[var] = save_vars[var]
127  raise IFDHError(cmd, rc, jobout, joberr)
128 
129  # Restore environment variables.
130 
131  for var in list(save_vars.keys()):
132  os.environ[var] = save_vars[var]
133 
134 
135 # Ifdh ls, with timeout.
136 # Return value is list of lines returned by "ifdh ls" command.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.ifdh_ll (   path,
  depth 
)

Definition at line 186 of file larbatch_utilities.py.

187 def ifdh_ll(path, depth):
188 
189  # Get proxy.
190 
191  test_proxy()
192 
193  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
194  # are not defined (they confuse ifdh, or rather the underlying tools).
195 
196  save_vars = {}
197  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
198  if var in os.environ:
199  save_vars[var] = os.environ[var]
200  del os.environ[var]
201 
202  # Do listing.
203 
204  cmd = ['ifdh', 'll', path, '%d' % depth]
205  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
206 
207  q = queue.Queue()
208  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
209  thread.start()
210  thread.join(timeout=60)
211  if thread.is_alive():
212  print('Terminating subprocess.')
213  jobinfo.terminate()
214  thread.join()
215  rc = q.get()
216  jobout = convert_str(q.get())
217  joberr = convert_str(q.get())
218  if rc != 0:
219  for var in list(save_vars.keys()):
220  os.environ[var] = save_vars[var]
221  raise IFDHError(cmd, rc, jobout, joberr)
222 
223  # Restore environment variables.
224 
225  for var in list(save_vars.keys()):
226  os.environ[var] = save_vars[var]
227 
228  # Done.
229 
230  return jobout.splitlines()
231 
232 
233 # Ifdh mkdir, with timeout.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.ifdh_ls (   path,
  depth 
)

Definition at line 137 of file larbatch_utilities.py.

138 def ifdh_ls(path, depth):
139 
140  # Get proxy.
141 
142  test_proxy()
143 
144  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
145  # are not defined (they confuse ifdh, or rather the underlying tools).
146 
147  save_vars = {}
148  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
149  if var in os.environ:
150  save_vars[var] = os.environ[var]
151  del os.environ[var]
152 
153  # Do listing.
154 
155  cmd = ['ifdh', 'ls', path, '%d' % depth]
156  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
157 
158  q = queue.Queue()
159  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
160  thread.start()
161  thread.join(timeout=600)
162  if thread.is_alive():
163  print('Terminating subprocess.')
164  jobinfo.terminate()
165  thread.join()
166  rc = q.get()
167  jobout = convert_str(q.get())
168  joberr = convert_str(q.get())
169  if rc != 0:
170  for var in list(save_vars.keys()):
171  os.environ[var] = save_vars[var]
172  raise IFDHError(cmd, rc, jobout, joberr)
173 
174  # Restore environment variables.
175 
176  for var in list(save_vars.keys()):
177  os.environ[var] = save_vars[var]
178 
179  # Done.
180 
181  return jobout.splitlines()
182 
183 
184 # Ifdh ll, with timeout.
185 # Return value is list of lines returned by "ifdh ls" command.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.ifdh_mkdir (   path)

Definition at line 234 of file larbatch_utilities.py.

235 def ifdh_mkdir(path):
236 
237  # Get proxy.
238 
239  test_proxy()
240 
241  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
242  # are not defined (they confuse ifdh, or rather the underlying tools).
243 
244  save_vars = {}
245  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
246  if var in os.environ:
247  save_vars[var] = os.environ[var]
248  del os.environ[var]
249 
250  # Do mkdir.
251 
252  cmd = ['ifdh', 'mkdir', path]
253  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
254 
255  q = queue.Queue()
256  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
257  thread.start()
258  thread.join(timeout=60)
259  if thread.is_alive():
260  print('Terminating subprocess.')
261  jobinfo.terminate()
262  thread.join()
263  rc = q.get()
264  jobout = convert_str(q.get())
265  joberr = convert_str(q.get())
266  if rc != 0:
267  for var in list(save_vars.keys()):
268  os.environ[var] = save_vars[var]
269  raise IFDHError(cmd, rc, jobout, joberr)
270 
271  # Restore environment variables.
272 
273  for var in list(save_vars.keys()):
274  os.environ[var] = save_vars[var]
275 
276  # Done.
277 
278  return
279 
280 
281 # Ifdh rmdir, with timeout.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.ifdh_mv (   src,
  dest 
)

Definition at line 376 of file larbatch_utilities.py.

377 def ifdh_mv(src, dest):
378 
379  # Get proxy.
380 
381  test_proxy()
382 
383  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
384  # are not defined (they confuse ifdh, or rather the underlying tools).
385 
386  save_vars = {}
387  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
388  if var in os.environ:
389  save_vars[var] = os.environ[var]
390  del os.environ[var]
391 
392  # Do rename.
393 
394  cmd = ['ifdh', 'mv', src, dest]
395  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
396 
397  q = queue.Queue()
398  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
399  thread.start()
400  thread.join(timeout=60)
401  if thread.is_alive():
402  print('Terminating subprocess.')
403  jobinfo.terminate()
404  thread.join()
405  rc = q.get()
406  jobout = convert_str(q.get())
407  joberr = convert_str(q.get())
408  if rc != 0:
409  for var in list(save_vars.keys()):
410  os.environ[var] = save_vars[var]
411  raise IFDHError(cmd, rc, jobout, joberr)
412 
413  # Restore environment variables.
414 
415  for var in list(save_vars.keys()):
416  os.environ[var] = save_vars[var]
417 
418  # Done.
419 
420  return
421 
422 
423 # Ifdh rm, with timeout.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.ifdh_rm (   path)

Definition at line 424 of file larbatch_utilities.py.

425 def ifdh_rm(path):
426 
427  # Get proxy.
428 
429  test_proxy()
430 
431  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
432  # are not defined (they confuse ifdh, or rather the underlying tools).
433 
434  save_vars = {}
435  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
436  if var in os.environ:
437  save_vars[var] = os.environ[var]
438  del os.environ[var]
439 
440  # Do delete.
441 
442  cmd = ['ifdh', 'rm', path]
443  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
444 
445  q = queue.Queue()
446  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
447  thread.start()
448  thread.join(timeout=60)
449  if thread.is_alive():
450  print('Terminating subprocess.')
451  jobinfo.terminate()
452  thread.join()
453  rc = q.get()
454  jobout = convert_str(q.get())
455  joberr = convert_str(q.get())
456  if rc != 0:
457  for var in list(save_vars.keys()):
458  os.environ[var] = save_vars[var]
459  raise IFDHError(cmd, rc, jobout, joberr)
460 
461  # Restore environment variables.
462 
463  for var in list(save_vars.keys()):
464  os.environ[var] = save_vars[var]
465 
466  # Done.
467 
468  return
469 
470 
471 # Posix copy with timeout.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.ifdh_rmdir (   path)

Definition at line 282 of file larbatch_utilities.py.

283 def ifdh_rmdir(path):
284 
285  # Get proxy.
286 
287  test_proxy()
288 
289  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
290  # are not defined (they confuse ifdh, or rather the underlying tools).
291 
292  save_vars = {}
293  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
294  if var in os.environ:
295  save_vars[var] = os.environ[var]
296  del os.environ[var]
297 
298  # Do rmdir.
299 
300  cmd = ['ifdh', 'rmdir', path]
301  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
302 
303  q = queue.Queue()
304  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
305  thread.start()
306  thread.join(timeout=60)
307  if thread.is_alive():
308  print('Terminating subprocess.')
309  jobinfo.terminate()
310  thread.join()
311  rc = q.get()
312  jobout = convert_str(q.get())
313  joberr = convert_str(q.get())
314  if rc != 0:
315  for var in list(save_vars.keys()):
316  os.environ[var] = save_vars[var]
317  raise IFDHError(cmd, rc, jobout, joberr)
318 
319  # Restore environment variables.
320 
321  for var in list(save_vars.keys()):
322  os.environ[var] = save_vars[var]
323 
324  # Done.
325 
326  return
327 
328 
329 # Ifdh chmod, with timeout.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
list
Definition: file_to_url.sh:28
def python.larbatch_utilities.nfs_server ( )

Definition at line 789 of file larbatch_utilities.py.

790 def nfs_server():
791  return '%sgpvm01.fnal.gov' % os.environ['EXPERIMENT']
792 
793 
794 # Parse the ten-character file mode string as returned by "ls -l"
795 # and return mode bit masek.
def python.larbatch_utilities.parse_mode (   mode_str)

Definition at line 796 of file larbatch_utilities.py.

797 def parse_mode(mode_str):
798 
799  mode = 0
800 
801  # File type.
802 
803  if mode_str[0] == 'b':
804  mode += stat.S_IFBLK
805  elif mode_str[0] == 'c':
806  mode += stat.S_IFCHR
807  elif mode_str[0] == 'd':
808  mode += stat.S_IFDIR
809  elif mode_str[0] == 'l':
810  mode += stat.S_IFLNK
811  elif mode_str[0] == 'p':
812  mode += stat.S_IFIFO
813  elif mode_str[0] == 's':
814  mode += stat.S_IFSOCK
815  elif mode_str[0] == '-':
816  mode += stat.S_IFREG
817 
818  # File permissions.
819 
820  # User triad (includes setuid).
821 
822  if mode_str[1] == 'r':
823  mode += stat.S_IRUSR
824  if mode_str[2] == 'w':
825  mode += stat.S_IWUSR
826  if mode_str[3] == 'x':
827  mode += stat.S_IXUSR
828  elif mode_str[3] == 's':
829  mode += stat.S_ISUID
830  mode += stat.S_IXUSR
831  elif mode_str[3] == 'S':
832  mode += stat.S_ISUID
833 
834  # Group triad (includes setgid).
835 
836  if mode_str[4] == 'r':
837  mode += stat.S_IRGRP
838  if mode_str[5] == 'w':
839  mode += stat.S_IWGRP
840  if mode_str[6] == 'x':
841  mode += stat.S_IXGRP
842  elif mode_str[6] == 's':
843  mode += stat.S_ISGID
844  mode += stat.S_IXGRP
845  elif mode_str[6] == 'S':
846  mode += stat.S_ISGID
847 
848  # World triad (includes sticky bit).
849 
850  if mode_str[7] == 'r':
851  mode += stat.S_IROTH
852  if mode_str[8] == 'w':
853  mode += stat.S_IWOTH
854  if mode_str[9] == 'x':
855  mode += stat.S_IXOTH
856  elif mode_str[9] == 't':
857  mode += stat.S_ISVTX
858  mode += stat.S_IXOTH
859  elif mode_str[9] == 'T':
860  mode += stat.S_ISVTX
861 
862  # Done
863 
864  return mode
865 
866 # Function to return the current experiment.
867 # The following places for obtaining this information are
868 # tried (in order):
869 #
870 # 1. Environment variable $EXPERIMENT.
871 # 2. Environment variable $SAM_EXPERIMENT.
872 # 3. Hostname (up to "gpvm").
873 #
874 # Raise an exception if none of the above methods works.
875 #
def python.larbatch_utilities.posix_cp (   source,
  destination 
)

Definition at line 472 of file larbatch_utilities.py.

473 def posix_cp(source, destination):
474 
475  cmd = ['cp', source, destination]
476 
477  # Fork buffer process.
478 
479  buffer_pid = os.fork()
480  if buffer_pid == 0:
481 
482  # In child process.
483  # Launch cp subprocess.
484 
485  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
486 
487  q = queue.Queue()
488  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
489  thread.start()
490  thread.join(timeout=600)
491  if thread.is_alive():
492 
493  # Subprocess did not finish (may be hanging and unkillable).
494  # Try to kill the subprocess and exit process.
495  # Unkillable process will become detached.
496 
497  print('Terminating subprocess.')
498  jobinfo.kill()
499  os._exit(1)
500 
501  else:
502 
503  # Subprocess finished normally.
504 
505  rc = q.get()
506  jobout = convert_str(q.get())
507  joberr = convert_str(q.get())
508  os._exit(rc)
509 
510  else:
511 
512  # In parent process.
513  # Wait for buffer subprocess to finish.
514 
515  buffer_result = os.waitpid(buffer_pid, 0)
516  rc = buffer_result[1]/256
517  if rc != 0:
518  raise IFDHError(cmd, rc, '', '')
519 
520  # Done.
521 
522  return
523 
524 
525 # Function to wait for a subprocess to finish and fetch return code,
526 # standard output, and standard error.
527 # Call this function like this:
528 #
529 # q = Queue.Queue()
530 # jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
531 # wait_for_subprocess(jobinfo, q, input)
532 # rc = q.get() # Return code.
533 # jobout = q.get() # Standard output
534 # joberr = q.get() # Standard error
535 
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def python.larbatch_utilities.srm_uri (   path)

Definition at line 778 of file larbatch_utilities.py.

779 def srm_uri(path):
780  if path.startswith('/pnfs/'):
781  return 'srm://fndca1.fnal.gov:8443/srm/managerv2?SFN=/pnfs/fnal.gov/usr/' + path[6:]
782  else:
783  return path
784 
785 
786 # Return the name of a computer with login access that has the /pnfs
787 # filesystem nfs-mounted. This function makes use of the $EXPERIMENT
788 # environment variable (as does ifdh), which must be set.
def python.larbatch_utilities.test_jobsub ( )

Definition at line 710 of file larbatch_utilities.py.

711 def test_jobsub():
712  global jobsub_ok
713  if not jobsub_ok:
714 
715  # Look for command jobsub_submit on execution path.
716 
717  try:
718  jobinfo = subprocess.Popen(['which', 'jobsub_submit'],
719  stdout=subprocess.PIPE,
720  stderr=subprocess.PIPE)
721  jobout, joberr = jobinfo.communicate()
722  jobout = convert_str(jobout)
723  joberr = convert_str(joberr)
724  jobsub_path = jobout.splitlines()[0].strip()
725  if jobsub_path != '':
726  jobsub_ok = True
727  except:
728  pass
729 
730  if not jobsub_ok:
731  print('Please set up jobsub_client')
732  sys.exit(1)
733 
734  return jobsub_ok
735 
736 # Return dCache server.
do one_file $F done echo for F in find $TOP name CMakeLists txt print
def python.larbatch_utilities.test_kca ( )

Definition at line 630 of file larbatch_utilities.py.

631 def test_kca():
632  global kca_ok
633  if not kca_ok:
634  try:
635  if 'X509_USER_PROXY' in os.environ:
636  subprocess.check_call(['voms-proxy-info',
637  '-file', os.environ['X509_USER_PROXY'],
638  '-exists'], stdout=-1, stderr=-1)
639  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
640  subprocess.check_call(['voms-proxy-info',
641  '-file', os.environ['X509_USER_CERT'],
642  '-exists'], stdout=-1, stderr=-1)
643  else:
644  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
645 
646  # Workaround jobsub bug by setting environment variable X509_USER_PROXY to
647  # point to the default location of the kca certificate.
648 
649  x509_path = convert_str(subprocess.check_output(['voms-proxy-info', '-path'], stderr=-1))
650  os.environ['X509_USER_PROXY'] = x509_path.strip()
651 
652  kca_ok = True
653  except:
654  pass
655 
656  # If at this point we don't have a kca certificate, try to get one.
657 
658  if not kca_ok:
659  get_kca()
660 
661  # Final checkout.
662 
663  if not kca_ok:
664  try:
665  if 'X509_USER_PROXY' in os.environ:
666  subprocess.check_call(['voms-proxy-info',
667  '-file', os.environ['X509_USER_PROXY'],
668  '-exists'], stdout=-1, stderr=-1)
669  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
670  subprocess.check_call(['voms-proxy-info',
671  '-file', os.environ['X509_USER_CERT'],
672  '-exists'], stdout=-1, stderr=-1)
673  else:
674  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
675  kca_ok = True
676  except:
677  raise RuntimeError('Please get a kca certificate.')
678  return kca_ok
679 
680 
681 # Test whether user has a valid grid proxy. If not, try to get a new one.
def python.larbatch_utilities.test_proxy ( )

Definition at line 682 of file larbatch_utilities.py.

683 def test_proxy():
684  global proxy_ok
685  if not proxy_ok:
686  try:
687  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
688  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
689  proxy_ok = True
690  except:
691  pass
692 
693  # If at this point we don't have a grid proxy, try to get one.
694 
695  if not proxy_ok:
696  get_proxy()
697 
698  # Final checkout.
699 
700  if not proxy_ok:
701  try:
702  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
703  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
704  proxy_ok = True
705  except:
706  raise RuntimeError('Please get a grid proxy.')
707  return proxy_ok
708 
709 # Test whether jobsub_client has been set up.
def python.larbatch_utilities.test_ticket ( )

Definition at line 547 of file larbatch_utilities.py.

548 def test_ticket():
549  global ticket_ok
550  if not ticket_ok:
551  ok = subprocess.call(['klist', '-s'], stdout=-1, stderr=-1)
552  if ok != 0:
553  raise RuntimeError('Please get a kerberos ticket.')
554  ticket_ok = True
555  return ticket_ok
556 
557 
558 # Get kca certificate.
def python.larbatch_utilities.wait_for_subprocess (   jobinfo,
  q,
  input = None 
)

Definition at line 536 of file larbatch_utilities.py.

537 def wait_for_subprocess(jobinfo, q, input=None):
538  jobout, joberr = jobinfo.communicate(input)
539  rc = jobinfo.poll()
540  q.put(rc)
541  q.put(jobout)
542  q.put(joberr)
543  return
544 
545 
546 # Test whether user has a valid kerberos ticket. Raise exception if no.
def python.larbatch_utilities.xrootd_server_port ( )

Definition at line 751 of file larbatch_utilities.py.

752 def xrootd_server_port():
753  return dcache_server() + ':1094'
754 
755 
756 # Convert a pnfs path to xrootd uri.
757 # Return the input path unchanged if it isn't on dCache.
def python.larbatch_utilities.xrootd_uri (   path)

Definition at line 758 of file larbatch_utilities.py.

759 def xrootd_uri(path):
760  if path.startswith('/pnfs/'):
761  return 'root://' + xrootd_server_port() + dcache_path(path)
762  else:
763  return path
764 
765 
766 # Convert a pnfs path to gridftp uri.
767 # Return the input path unchanged if it isn't on dCache.

Variable Documentation

python.larbatch_utilities.jobsub_ok = False

Definition at line 88 of file larbatch_utilities.py.

python.larbatch_utilities.kca_ok = False

Definition at line 85 of file larbatch_utilities.py.

string python.larbatch_utilities.kca_user = ''

Definition at line 87 of file larbatch_utilities.py.

python.larbatch_utilities.proxy_ok = False

Definition at line 86 of file larbatch_utilities.py.

python.larbatch_utilities.ticket_ok = False

Definition at line 84 of file larbatch_utilities.py.