All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Public Member Functions | Protected Member Functions | List of all members
ana::SAMQuerySource Class Reference

File source based on a SAM query or dataset (definition) More...

#include <SAMQuerySource.h>

Inheritance diagram for ana::SAMQuerySource:
ana::FileListSource ana::IFileSource

Public Member Functions

 SAMQuerySource (const std::string &query, int stride=-1, int offset=-1)
 
virtual ~SAMQuerySource ()
 
- Public Member Functions inherited from ana::FileListSource
 FileListSource (const std::vector< std::string > &files, int stride=-1, int offset=-1)
 default offset and stride mean obey cmd-line options More...
 
virtual ~FileListSource ()
 
virtual TFile * GetNextFile () override
 Returns the next file in sequence, ready for reading. More...
 
int NFiles () const override
 May return -1 indicating the number of files is not known. More...
 
- Public Member Functions inherited from ana::IFileSource
virtual ~IFileSource ()
 

Protected Member Functions

std::vector< std::string > LocationsForSAMQuery (const std::string &str, int stride, int offset)
 
std::vector< std::string > LocateSAMFiles (const std::vector< std::string > &fnames)
 Take filenames, return locations suitable for TFile::Open() More...
 
std::string IFDHBaseURI () const
 
std::string EnsureDataset (const std::string &query) const
 
std::string EnsureSnapshot (const std::string &def) const
 

Additional Inherited Members

- Protected Attributes inherited from ana::FileListSource
std::vector< std::string > fFileNames
 The list of files. More...
 
std::vector< std::string >
::iterator 
fIt
 Iterator into fFileNames. More...
 
int fStride
 
int fN
 Number of files that will actually be returned. More...
 
TFile * fFile
 The most-recently-returned file. More...
 
- Static Protected Attributes inherited from ana::FileListSource
static bool fgGotTickets = false
 Have we renewed our tickets? More...
 

Detailed Description

File source based on a SAM query or dataset (definition)

Locates the files on bluearc or pnfs (bluearc preferred).

Definition at line 8 of file SAMQuerySource.h.

Constructor & Destructor Documentation

ana::SAMQuerySource::SAMQuerySource ( const std::string &  query,
int  stride = -1,
int  offset = -1 
)
Parameters
queryMay be a SAM dataset name or a SAM query string

Definition at line 19 of file SAMQuerySource.cxx.

22  : FileListSource(LocationsForSAMQuery(query, stride, offset), 1, 0)
23  {
24  }
BEGIN_PROLOG TPC Trig offset(g4 rise time) ProjectToHeight
Definition: CORSIKAGen.fcl:7
std::vector< std::string > LocationsForSAMQuery(const std::string &str, int stride, int offset)
FileListSource(const std::vector< std::string > &files, int stride=-1, int offset=-1)
default offset and stride mean obey cmd-line options
ana::SAMQuerySource::~SAMQuerySource ( )
virtual

Definition at line 27 of file SAMQuerySource.cxx.

28  {
29  }

Member Function Documentation

std::string ana::SAMQuerySource::EnsureDataset ( const std::string &  query) const
protected

Definition at line 32 of file SAMQuerySource.cxx.

33  {
34  const char* user = getenv("GRID_USER");
35  assert(user);
36 
37  TString dset = TString::Format("%s_cafana_%s", user, query.c_str());
38  // Sanitize various special characters that can appear in queries
39  dset.ReplaceAll(" ", "_");
40  dset.ReplaceAll("(", "_OPEN_");
41  dset.ReplaceAll(")", "_CLOSE_");
42  dset.ReplaceAll(":", "_COLON_");
43  dset.ReplaceAll("'", "_SQUOTE_");
44  dset.ReplaceAll("\"", "_DQUOTE_");
45 
46  std::cout << "Creating dataset " << dset << " for query " << query << std::endl;
47 
48  // I would be much much happier to do this in proper code, but I'm not sure
49  // how, there's no samweb C++ API?
50  system(TString::Format("samweb -e %s list-definitions --defname %s | grep %s || samweb -e %s create-definition %s %s",
51  SAMExperiment().c_str(),
52  dset.Data(),
53  dset.Data(),
54  SAMExperiment().c_str(),
55  dset.Data(),
56  query.c_str()).Data());
57 
58  return dset.Data();
59  }
std::string SAMExperiment()
$SAM_EXPERIMENT or a nice error message and abort
BEGIN_PROLOG could also be cout
std::string ana::SAMQuerySource::EnsureSnapshot ( const std::string &  def) const
protected

Definition at line 62 of file SAMQuerySource.cxx.

63  {
64  const char* user = getenv("GRID_USER");
65  assert(user);
66  const char* cluster = getenv("CLUSTER");
67  assert(cluster);
68  const char* process = getenv("PROCESS");
69  assert(process);
70 
71  // Jobs in the same cluster should share the same snapshot of the dataset
72  // so as not to hammer SAM with multiple requests for the same file list,
73  // but so that the dataset snapshot is updated with every new submission.
74  const std::string snap = TString::Format("%s_cafana_snap_%s_%s",
75  user, def.c_str(), cluster).Data();
76 
77  // I'd love to do all this with a proper API, but samweb doesn't seem to
78  // have a C++ one? So we get this stew of system() calls...
79 
80  // Use this name as an indication that someone is working on creating the
81  // snapshot and every one else should stand by.
82  const std::string snaplock = TString::Format("%s_cafana_snap_lock_%s_%s",
83  user, def.c_str(), cluster).Data();
84 
85  // Try to create the lock. Success means we have to create the snapshot,
86  // failure means someone else is working on it. The content of the
87  // definition (the nova.special) doesn't matter, except it has to be unique
88  // between the jobs, because trying to create an exact duplicate of an
89  // existing definition counts as success.
90  std::cout << "Checking lock " << snaplock << std::endl;
91  if(system(TString::Format("samweb -e %s create-definition %s file_name %s",
92  SAMExperiment().c_str(),
93  snaplock.c_str(),
94  process).Data()) == 0){
95  // No one took the lock, it's up to us. Make the actual snapshot
96  std::cout << "Snapshotting " << def << " as " << snap << std::endl;
97  system(TString::Format("samweb -e %s take-snapshot %s | xargs samweb -e %s create-definition %s snapshot_id",
98  SAMExperiment().c_str(),
99  def.c_str(),
100  SAMExperiment().c_str(),
101  snap.c_str()).Data());
102  }
103  else{
104  // Lock already exists, just wait for the real snapshot to be created
105  double period = 1;
106  while(system(TString::Format("samweb -e %s list-definitions --defname %s | grep %s",
107  SAMExperiment().c_str(),
108  snap.c_str(),
109  snap.c_str()).Data()) != 0){
110  sleep(int(period));
111  period *= 1.5;
112  if(period > 60*30){
113  std::cout << "We've been waiting a real long time for " << snap << " to be created. I don't think it's happening." << std::endl;
114  abort();
115  }
116  }
117  }
118  std::cout << "Will use " << snap << std::endl;
119 
120  return snap;
121  }
process_name cluster
Definition: cheaterreco.fcl:51
std::string SAMExperiment()
$SAM_EXPERIMENT or a nice error message and abort
BEGIN_PROLOG could also be cout
std::string ana::SAMQuerySource::IFDHBaseURI ( ) const
protected

Definition at line 124 of file SAMQuerySource.cxx.

125  {
126  // This is the same logic as implemented in the default ifdh() constructor,
127  // but using $SAM_EXPERIMENT in place of $EXPERIMENT
128 
129  // Allow user to override to arbitrary value
130  if(getenv("IFDH_BASE_URI")) return getenv("IFDH_BASE_URI");
131 
132  return ifdh::_default_base_uri + SAMExperiment() + "/api";
133  }
std::string SAMExperiment()
$SAM_EXPERIMENT or a nice error message and abort
std::vector< std::string > ana::SAMQuerySource::LocateSAMFiles ( const std::vector< std::string > &  fnames)
protected

Take filenames, return locations suitable for TFile::Open()

Definition at line 199 of file SAMQuerySource.cxx.

200  {
201  std::vector<std::string> ret;
202 
203  // We're going to fill this map of locations for all the files
204  std::map<std::string, std::vector<std::string>> locmap;
205 
206  Progress prog(TString::Format("Looking up locations of %ld files using SAM", fnames.size()).Data());
207 
208  ifdh i(IFDHBaseURI());
209  i.set_debug("0"); // shut up
210 
211  // locateFiles() saves the roundtrip time of talking to the server about
212  // every file individually, but it seems to bog down for large
213  // queries. Split the query into chunks. Experimentally this is about the
214  // sweet spot.
215  const unsigned int kStep = 50;
216  for(unsigned int fIdx = 0; fIdx < fnames.size(); fIdx += kStep){
217  prog.SetProgress(double(fIdx)/fnames.size());
218 
219  // The files we're looking up right now. Careful not to run off the end
220  // of the vector.
221  const std::vector<std::string> fslice(fnames.begin()+fIdx, fIdx+kStep < fnames.size() ? fnames.begin()+fIdx+kStep : fnames.end());
222 
223  const auto locslice = i.locateFiles(fslice);
224 
225  locmap.insert(locslice.begin(), locslice.end());
226  }
227 
228  prog.Done();
229 
230 
231  // Now go through the map and pick our favourite location for each file,
232  // and do some cleanup.
233  for(auto it: locmap){
234  const std::string& f = it.first;
235  const std::vector<std::string>& locs = it.second;
236 
237  int best = 0;
238 
239  std::string resolved;
240  for(TString loc: locs){
241  // Never try to access bluearc locations from the grid
242  if(!RunningOnGrid() && loc.BeginsWith("novadata:") && best < 3){
243  loc.Remove(0, 9);
244 
245  // Rewrite FNAL bluearc paths so users with matching directory
246  // structures offsite can access their local copies.
247  if(std::getenv("NOVA_ANA" )) loc.ReplaceAll("/nova/ana", std::getenv("NOVA_ANA"));
248  if(std::getenv("NOVA_APP" )) loc.ReplaceAll("/nova/app", std::getenv("NOVA_APP"));
249  if(std::getenv("NOVA_DATA")) loc.ReplaceAll("/nova/data", std::getenv("NOVA_DATA"));
250  if(std::getenv("NOVA_PROD")) loc.ReplaceAll("/nova/prod", std::getenv("NOVA_PROD"));
251 
252  // Check if the file exists at that location. If not, maybe pnfs has
253  // it.
254  struct stat junk;
255  if(stat((resolved+'/'+f).c_str(), &junk) == 0){
256  best = 3; // Prefer bluearc locations
257  resolved = loc;
258  }
259  }
260 
261  if(loc.BeginsWith("dcache:") && best < 2){
262  // Otherwise, used xrootd. Prefer "dache:" to "enstore:" because
263  // "dcache:" probably means /pnfs/nova/persistent/ so no chance of a
264  // huge wait for tape.
265  best = 2;
266 
267  // FileListSource does the actual conversion to xrootd
268  loc.ReplaceAll("dcache:/pnfs/", "/pnfs/");
269  // Strip the bit in brackets from the end
270  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
271  resolved = loc;
272  }
273 
274  if(loc.BeginsWith("enstore:") && best < 1){
275  best = 1;
276 
277  loc.ReplaceAll("enstore:/pnfs/", "/pnfs/");
278  if(loc.First('(') >= 0) loc.Resize(loc.First('('));
279  resolved = loc;
280  }
281 
282  } // end for loc
283 
284  if(best == 0 || resolved.empty()){
285  std::cerr << "\nCouldn't find a usable location for " << f
286  << "'. Aborting..." << std::endl;
287  abort();
288  }
289 
290  ret.push_back((resolved+'/'+f));
291  } // end for fIdx
292 
293  return ret;
294  }
std::string IFDHBaseURI() const
BEGIN_PROLOG could also be cerr
prog
Definition: just_cmake.sh:3
std::vector< std::string > ana::SAMQuerySource::LocationsForSAMQuery ( const std::string &  str,
int  stride,
int  offset 
)
protected

Definition at line 137 of file SAMQuerySource.cxx.

138  {
139  TString query = str;
140 
141  // This is an actual query
142  if(query.Contains(' ') && RunningOnGrid()){
143  // On the grid we want to convert that to a dataset we can snapshot below
144  query = EnsureDataset(query.Data());
145  }
146 
147  // This is a dataset name
148  if(!query.Contains(' ')){
149  if(getenv("CAFANA_USE_SNAPSHOTS")){
150  query = "dataset_def_name_newest_snapshot "+query;
151  }
152  else{
153  // Take one snapshot between all the jobs and share that
154  if(RunningOnGrid()) query = EnsureSnapshot(query.Data());
155 
156  query = "defname: "+query;
157  }
158  }
159 
160  if(stride > 1){
161  query += TString::Format(" with stride %d", stride).Data();
162  if(offset > 0){
163  query += TString::Format(" offset %d", offset).Data();
164  }
165  }
166 
167 
168  std::cout << "Looking up files matching '" << query << "' using SAM...\n";
169 
170  std::vector<std::string> files;
171 
172  ifdh i(IFDHBaseURI());
173  i.set_debug("0"); // shut up
174  try{
175  files = i.translateConstraints(query.Data());
176  }
177  catch(ifdh_util_ns::WebAPIException& e){
178  // I like my error message below better, since this could well be a
179  // mistyped filename.
180  }
181 
182  if(files.empty()){
183  std::cerr << "\nCan't find any files matching '" << str
184  << "'. Aborting..." << std::endl;
185  abort();
186  }
187 
188  // IFDH likes to give back an empty string as the last response
189  // https://cdcvs.fnal.gov/redmine/issues/6718
190  if(!files.empty() && files.back().empty()){
191  files.pop_back();
192  }
193 
194  return LocateSAMFiles(files);
195  }
std::string IFDHBaseURI() const
BEGIN_PROLOG TPC Trig offset(g4 rise time) ProjectToHeight
Definition: CORSIKAGen.fcl:7
BEGIN_PROLOG could also be cerr
std::string EnsureDataset(const std::string &query) const
std::string EnsureSnapshot(const std::string &def) const
std::vector< std::string > LocateSAMFiles(const std::vector< std::string > &fnames)
Take filenames, return locations suitable for TFile::Open()
do i e
BEGIN_PROLOG could also be cout

The documentation for this class was generated from the following files: