All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SAMQuerySource.cxx
Go to the documentation of this file.
2 
5 
6 #include "ifdh.h"
7 
8 #include <cassert>
9 #include <iostream>
10 #include <map>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 
14 #include "TString.h"
15 
16 namespace ana
17 {
18  //----------------------------------------------------------------------
19  SAMQuerySource::SAMQuerySource(const std::string& query,
20  int stride, int offset)
21  // Stride and offset already taken account of in the query
22  : FileListSource(LocationsForSAMQuery(query, stride, offset), 1, 0)
23  {
24  }
25 
26  //----------------------------------------------------------------------
28  {
29  }
30 
31  //----------------------------------------------------------------------
32  std::string SAMQuerySource::EnsureDataset(const std::string& query) const
33  {
34  const char* user = getenv("GRID_USER");
35  assert(user);
36 
37  TString dset = TString::Format("%s_cafana_%s", user, query.c_str());
38  // Sanitize various special characters that can appear in queries
39  dset.ReplaceAll(" ", "_");
40  dset.ReplaceAll("(", "_OPEN_");
41  dset.ReplaceAll(")", "_CLOSE_");
42  dset.ReplaceAll(":", "_COLON_");
43  dset.ReplaceAll("'", "_SQUOTE_");
44  dset.ReplaceAll("\"", "_DQUOTE_");
45 
46  std::cout << "Creating dataset " << dset << " for query " << query << std::endl;
47 
48  // I would be much much happier to do this in proper code, but I'm not sure
49  // how, there's no samweb C++ API?
50  system(TString::Format("samweb -e %s list-definitions --defname %s | grep %s || samweb -e %s create-definition %s %s",
51  SAMExperiment().c_str(),
52  dset.Data(),
53  dset.Data(),
54  SAMExperiment().c_str(),
55  dset.Data(),
56  query.c_str()).Data());
57 
58  return dset.Data();
59  }
60 
61  //----------------------------------------------------------------------
62  std::string SAMQuerySource::EnsureSnapshot(const std::string& def) const
63  {
64  const char* user = getenv("GRID_USER");
65  assert(user);
66  const char* cluster = getenv("CLUSTER");
67  assert(cluster);
68  const char* process = getenv("PROCESS");
69  assert(process);
70 
71  // Jobs in the same cluster should share the same snapshot of the dataset
72  // so as not to hammer SAM with multiple requests for the same file list,
73  // but so that the dataset snapshot is updated with every new submission.
74  const std::string snap = TString::Format("%s_cafana_snap_%s_%s",
75  user, def.c_str(), cluster).Data();
76 
77  // I'd love to do all this with a proper API, but samweb doesn't seem to
78  // have a C++ one? So we get this stew of system() calls...
79 
80  // Use this name as an indication that someone is working on creating the
81  // snapshot and every one else should stand by.
82  const std::string snaplock = TString::Format("%s_cafana_snap_lock_%s_%s",
83  user, def.c_str(), cluster).Data();
84 
85  // Try to create the lock. Success means we have to create the snapshot,
86  // failure means someone else is working on it. The content of the
87  // definition (the nova.special) doesn't matter, except it has to be unique
88  // between the jobs, because trying to create an exact duplicate of an
89  // existing definition counts as success.
90  std::cout << "Checking lock " << snaplock << std::endl;
91  if(system(TString::Format("samweb -e %s create-definition %s file_name %s",
92  SAMExperiment().c_str(),
93  snaplock.c_str(),
94  process).Data()) == 0){
95  // No one took the lock, it's up to us. Make the actual snapshot
96  std::cout << "Snapshotting " << def << " as " << snap << std::endl;
97  system(TString::Format("samweb -e %s take-snapshot %s | xargs samweb -e %s create-definition %s snapshot_id",
98  SAMExperiment().c_str(),
99  def.c_str(),
100  SAMExperiment().c_str(),
101  snap.c_str()).Data());
102  }
103  else{
104  // Lock already exists, just wait for the real snapshot to be created
105  double period = 1;
106  while(system(TString::Format("samweb -e %s list-definitions --defname %s | grep %s",
107  SAMExperiment().c_str(),
108  snap.c_str(),
109  snap.c_str()).Data()) != 0){
110  sleep(int(period));
111  period *= 1.5;
112  if(period > 60*30){
113  std::cout << "We've been waiting a real long time for " << snap << " to be created. I don't think it's happening." << std::endl;
114  abort();
115  }
116  }
117  }
118  std::cout << "Will use " << snap << std::endl;
119 
120  return snap;
121  }
122 
123  //----------------------------------------------------------------------
124  std::string SAMQuerySource::IFDHBaseURI() const
125  {
126  // This is the same logic as implemented in the default ifdh() constructor,
127  // but using $SAM_EXPERIMENT in place of $EXPERIMENT
128 
129  // Allow user to override to arbitrary value
130  if(getenv("IFDH_BASE_URI")) return getenv("IFDH_BASE_URI");
131 
132  return ifdh::_default_base_uri + SAMExperiment() + "/api";
133  }
134 
135  //----------------------------------------------------------------------
136  std::vector<std::string> SAMQuerySource::
137  LocationsForSAMQuery(const std::string& str, int stride, int offset)
138  {
139  TString query = str;
140 
141  // This is an actual query
142  if(query.Contains(' ') && RunningOnGrid()){
143  // On the grid we want to convert that to a dataset we can snapshot below
144  query = EnsureDataset(query.Data());
145  }
146 
147  // This is a dataset name
148  if(!query.Contains(' ')){
149  if(getenv("CAFANA_USE_SNAPSHOTS")){
150  query = "dataset_def_name_newest_snapshot "+query;
151  }
152  else{
153  // Take one snapshot between all the jobs and share that
154  if(RunningOnGrid()) query = EnsureSnapshot(query.Data());
155 
156  query = "defname: "+query;
157  }
158  }
159 
160  if(stride > 1){
161  query += TString::Format(" with stride %d", stride).Data();
162  if(offset > 0){
163  query += TString::Format(" offset %d", offset).Data();
164  }
165  }
166 
167 
168  std::cout << "Looking up files matching '" << query << "' using SAM...\n";
169 
170  std::vector<std::string> files;
171 
172  ifdh i(IFDHBaseURI());
173  i.set_debug("0"); // shut up
174  try{
175  files = i.translateConstraints(query.Data());
176  }
177  catch(ifdh_util_ns::WebAPIException& e){
178  // I like my error message below better, since this could well be a
179  // mistyped filename.
180  }
181 
182  if(files.empty()){
183  std::cerr << "\nCan't find any files matching '" << str
184  << "'. Aborting..." << std::endl;
185  abort();
186  }
187 
188  // IFDH likes to give back an empty string as the last response
189  // https://cdcvs.fnal.gov/redmine/issues/6718
190  if(!files.empty() && files.back().empty()){
191  files.pop_back();
192  }
193 
194  return LocateSAMFiles(files);
195  }
196 
197  //----------------------------------------------------------------------
198  std::vector<std::string> SAMQuerySource::
199  LocateSAMFiles(const std::vector<std::string>& fnames)
200  {
201  std::vector<std::string> ret;
202 
203  // We're going to fill this map of locations for all the files
204  std::map<std::string, std::vector<std::string>> locmap;
205 
206  Progress prog(TString::Format("Looking up locations of %ld files using SAM", fnames.size()).Data());
207 
208  ifdh i(IFDHBaseURI());
209  i.set_debug("0"); // shut up
210 
211  // locateFiles() saves the roundtrip time of talking to the server about
212  // every file individually, but it seems to bog down for large
213  // queries. Split the query into chunks. Experimentally this is about the
214  // sweet spot.
215  const unsigned int kStep = 50;
216  for(unsigned int fIdx = 0; fIdx < fnames.size(); fIdx += kStep){
217  prog.SetProgress(double(fIdx)/fnames.size());
218 
219  // The files we're looking up right now. Careful not to run off the end
220  // of the vector.
221  const std::vector<std::string> fslice(fnames.begin()+fIdx, fIdx+kStep < fnames.size() ? fnames.begin()+fIdx+kStep : fnames.end());
222 
223  const auto locslice = i.locateFiles(fslice);
224 
225  locmap.insert(locslice.begin(), locslice.end());
226  }
227 
228  prog.Done();
229 
230 
231  // Now go through the map and pick our favourite location for each file,
232  // and do some cleanup.
233  for(auto it: locmap){
234  const std::string& f = it.first;
235  const std::vector<std::string>& locs = it.second;
236 
237  int best = 0;
238 
239  std::string resolved;
240  for(TString loc: locs){
241  // Never try to access bluearc locations from the grid
242  if(!RunningOnGrid() && loc.BeginsWith("novadata:") && best < 3){
243  loc.Remove(0, 9);
244 
245  // Rewrite FNAL bluearc paths so users with matching directory
246  // structures offsite can access their local copies.
247  if(std::getenv("NOVA_ANA" )) loc.ReplaceAll("/nova/ana", std::getenv("NOVA_ANA"));
248  if(std::getenv("NOVA_APP" )) loc.ReplaceAll("/nova/app", std::getenv("NOVA_APP"));
249  if(std::getenv("NOVA_DATA")) loc.ReplaceAll("/nova/data", std::getenv("NOVA_DATA"));
250  if(std::getenv("NOVA_PROD")) loc.ReplaceAll("/nova/prod", std::getenv("NOVA_PROD"));
251 
252  // Check if the file exists at that location. If not, maybe pnfs has
253  // it.
254  struct stat junk;
255  if(stat((resolved+'/'+f).c_str(), &junk) == 0){
256  best = 3; // Prefer bluearc locations
257  resolved = loc;
258  }
259  }
260 
261  if(loc.BeginsWith("dcache:") && best < 2){
262  // Otherwise, used xrootd. Prefer "dache:" to "enstore:" because
263  // "dcache:" probably means /pnfs/nova/persistent/ so no chance of a
264  // huge wait for tape.
265  best = 2;
266 
267  // FileListSource does the actual conversion to xrootd
268  loc.ReplaceAll("dcache:/pnfs/", "/pnfs/");
269  // Strip the bit in brackets from the end
270  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
271  resolved = loc;
272  }
273 
274  if(loc.BeginsWith("enstore:") && best < 1){
275  best = 1;
276 
277  loc.ReplaceAll("enstore:/pnfs/", "/pnfs/");
278  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
279  resolved = loc;
280  }
281 
282  } // end for loc
283 
284  if(best == 0 || resolved.empty()){
285  std::cerr << "\nCouldn't find a usable location for " << f
286  << "'. Aborting..." << std::endl;
287  abort();
288  }
289 
290  ret.push_back((resolved+'/'+f));
291  } // end for fIdx
292 
293  return ret;
294  }
295 }
std::string IFDHBaseURI() const
BEGIN_PROLOG TPC Trig offset(g4 rise time) ProjectToHeight
Definition: CORSIKAGen.fcl:7
BEGIN_PROLOG could also be cerr
process_name cluster
Definition: cheaterreco.fcl:51
process_name opflashCryoW ana
std::string EnsureDataset(const std::string &query) const
std::vector< std::string > LocationsForSAMQuery(const std::string &str, int stride, int offset)
SAMQuerySource(const std::string &query, int stride=-1, int offset=-1)
std::string EnsureSnapshot(const std::string &def) const
Simple file source based on an explicit list provided by the user.
std::vector< std::string > LocateSAMFiles(const std::vector< std::string > &fnames)
Take filenames, return locations suitable for TFile::Open()
std::string SAMExperiment()
$SAM_EXPERIMENT or a nice error message and abort
do i e
A simple ascii-art progress bar.
Definition: Progress.h:9
prog
Definition: just_cmake.sh:3
BEGIN_PROLOG could also be cout