All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
glob.py
Go to the documentation of this file.
1 import glob
2 import numpy as np
3 import uproot
4 import pandas as pd
5 from tqdm.auto import tqdm
6 from multiprocessing import Pool
7 import multiprocessing
8 from . import names
9 import dill
10 
11 class NTupleProc(object):
12  def __init__(self, f=None, name="None"):
13  self.name = name
14  self.f = f
15 
16  def __call__(self, df):
17  return self.f(df)
18 
19  def __bool__(self):
20  return self.f is not None
21 
22  # make work with pickle for multiprocessing
23  def __getstate__(self):
24  return dill.dumps({"f": self.f, "name": self.name})
25 
26  def __setstate__(self, state):
27  data = dill.loads(state)
28  self.f = data["f"]
29  self.name = data["name"]
30 
31 
32 
33 def _makedf(dfs):
34  if not isinstance(dfs, tuple):
35  assert(isinstance(dfs, pd.DataFrame))
36  dfs = [dfs]
37  else:
38  dfs = list(dfs)
39 
40  npad = [max([len(b.split(".")) for b in df.columns]) for df in dfs]
41  def pad(b, i):
42  while len(b) < npad[i]:
43  b.append("")
44  return tuple(b)
45 
46  for i in range(len(dfs)):
47  dfs[i].columns = pd.MultiIndex.from_tuples([pad(b.split("."), i) for b in dfs[i].columns])
48 
49  # set the index name if not present
50  for i in range(len(dfs)):
51  if len(dfs[i].index.names) == 1 and dfs[i].index.names[0] is None:
52  dfs[i].index = dfs[i].index.set_names(["entry"])
53 
54  return dfs
55 
56 def _loaddf(inp):
57  fname, branches, index, applyf = inp
58  with uproot.open(fname) as f:
59  dfW = _makedf(f[names.folderW][names.tname].arrays(branches, library="pd"))
60  dfE = _makedf(f[names.folderE][names.tname].arrays(branches, library="pd"))
61  if applyf:
62  dfW = applyf(*dfW)
63  dfE = applyf(*dfE)
64  else:
65  dfW = dfW[0]
66  dfE = dfE[0]
67  # Set an index on the NTuple number to make sure we keep track of what is where
68  dfW["__ntuple"] = index
69  dfW.set_index("__ntuple", append=True, inplace=True)
70  dfW = dfW.reorder_levels([dfW.index.nlevels-1] + list(range(0, dfW.index.nlevels-1)))
71 
72  dfE["__ntuple"] = index + 1
73  dfE.set_index("__ntuple", append=True, inplace=True)
74  dfE = dfE.reorder_levels([dfE.index.nlevels-1] + list(range(0, dfE.index.nlevels-1)))
75 
76  dfW = dfW.append(dfE)
77  return dfW
78 
79 def _process(inp):
80  fname = inp[0]
81  with uproot.open(fname) as f:
82  return _do_process(f, *inp[1:])
83 
84 def _do_process(rootf, branches, vars, whens, bins):
85  hists = {}
86 
87  dfW = _makedf(rootf[names.folderW][names.tname].arrays(branches, library="pd"))
88  dfE = _makedf(rootf[names.folderE][names.tname].arrays(branches, library="pd"))
89 
90  valW = [v(dfW) for v in vars]
91  valE = [v(dfE) for v in vars]
92 
93  wW = [w(dfW) if w else None for w in whens]
94  wE = [w(dfE) if w else None for w in whens]
95 
96  runs = dfW.meta.run.unique()
97 
98  hists["W"] = {}
99  for r in runs:
100  hists["W"][r] = {}
101  for val, var in zip(valW, vars):
102  hists["W"][r][var.name] = {}
103  for w, when in zip(wW, whens):
104  if w is None:
105  hist = np.histogram(val[dfW.meta.run == r], bins=bins)
106  else:
107  hist = np.histogram(val[w & (dfW.meta.run == r)], bins=bins)
108 
109  hists["W"][r][var.name][when.name] = hist
110 
111  hists["E"] = {}
112  for r in runs:
113  hists["E"][r] = {}
114  for val, var in zip(valE, vars):
115  hists["E"][r][var.name] = {}
116  for w, when in zip(wE, whens):
117  if w is None:
118  hist = np.histogram(val[dfE.meta.run == r], bins=bins)
119  else:
120  hist = np.histogram(val[w & (dfE.meta.run == r)], bins=bins)
121 
122  hists["E"][r][var.name][when.name] = hist
123 
124  return hists
125 
126 class NTupleGlob(object):
127  def __init__(self, g, branches):
128  if isinstance(g, list):
129  self.glob = g
130  else:
131  self.glob = glob.glob(g)
132  self.branches = branches
133 
134  def dataframe(self, branches=None, maxfile=None, nproc=1, f=None):
135  if nproc == "auto":
136  nproc = multiprocessing.cpu_count()
137  if branches is None:
138  branches = self.branches
139 
140  thisglob = self.glob
141  if maxfile:
142  thisglob = thisglob[:maxfile]
143 
144  ret = []
145  with Pool(processes=nproc) as pool:
146  thisglob = [(g, branches, i*2, f) for i,g in enumerate(thisglob)]
147  for df in tqdm(pool.imap_unordered(_loaddf, thisglob), total=len(thisglob), unit="file", delay=5):
148  ret.append(df)
149 
150  ret = pd.concat(ret, axis=0, ignore_index=False)
151 
152  # Fix the index So that we don't need __ntuple
153  sub_index = ret.index.names[2:]
154  ret = ret.reset_index()
155  ret.entry = ret.groupby(["__ntuple", "entry"]).ngroup()
156  ret.set_index(["entry"] + sub_index, inplace=True, verify_integrity=True)
157  ret.sort_index(inplace=True)
158  del ret["__ntuple"]
159 
160  return ret
161 
162  def histogram(self, var, bins, when=NTupleProc(), flatten_runs=False, flatten_cryo=False, maxfile=None, nproc=1):
163  if nproc == "auto":
164  nproc = multiprocessing.cpu_count()
165 
166  if not isinstance(var, list):
167  var = [var]
168 
169  if not isinstance(when, list):
170  when = [when]
171 
172  ret = {}
173 
174  thisglob = self.glob
175  if maxfile:
176  thisglob = thisglob[:maxfile]
177 
178  globdata = [(f, self.branches, var, when, bins) for f in thisglob]
179 
180  with Pool(processes=nproc) as pool:
181  for hists in tqdm(pool.imap_unordered(_process, globdata), total=len(globdata), unit="file", delay=5):
182  for cname in hists.keys():
183  for runname in hists[cname].keys():
184  for varname in hists[cname][runname].keys():
185  for whenname in hists[cname][runname][varname].keys():
186  hist = self._load_histogram(cname, runname, varname, whenname, ret)
187  if hist is None:
188  ret[cname][runname][varname][whenname] = hists[cname][runname][varname][whenname]
189  else:
190  ret[cname][runname][varname][whenname] = self._hadd(hist, hists[cname][runname][varname][whenname])
191 
192  # Do flattening
193  flatret_cryo = {}
194  if flatten_cryo:
195  for runname in ret["E"].keys():
196  flatret_cryo[runname] = {}
197  for valname in ret["E"][runname].keys():
198  flatret_cryo[runname][valname] = {}
199  for whenname in ret["E"][runname][valname].keys():
200  flatret_cryo[runname][valname][whenname] = self._hadd(ret["E"][runname][valname][whenname], ret["W"][runname][valname][whenname])
201  ret = flatret_cryo
202 
203  flatret_run = {}
204  if not flatten_cryo:
205  flatret_run["E"] = {}
206  flatret_run["W"] = {}
207 
208  if flatten_runs:
209  histlist = [ret] if flatten_cryo else [ret["E"], ret["W"]]
210  makeflatlist = [flatret_run] if flatten_cryo else [flatret_run["E"], flatret_run["W"]]
211 
212  for hists, makeflat in zip(histlist, makeflatlist):
213  run0 = list(hists.keys())[0]
214 
215  for valname in hists[run0].keys():
216  makeflat[valname] = {}
217  for whenname in hists[run0][valname].keys():
218  makeflat[valname][whenname] = self._hadd(*[hists[runname][valname][whenname] for runname in hists.keys()])
219  ret = flatret_run
220 
221  return ret
222 
223  if len(when) == 1 and not when[0]:
224  if flatten_runs and flatten_cryo:
225  for varname in ret.keys():
226  ret[varname] = ret[varname]["None"]
227  elif flatten_runs:
228  for cname in ret.keys():
229  for varname in ret[cname].keys():
230  ret[cname][varname] = ret[cname][varname]["None"]
231  elif flatten_cryo:
232  for runname in ret.keys():
233  for varname in ret[runname].keys():
234  ret[runname][varname] = ret[runname][varname]["None"]
235  else:
236  for cname in ret.keys():
237  for runname in ret[cname].keys():
238  for varname in ret[cname][runname].keys():
239  ret[cname][runname][varname] = ret[cname][runname][varname]["None"]
240 
241  return ret
242 
243  def _hadd(self, *hs):
244  Ns = [N for N,_ in hs]
245  return np.sum(Ns, axis=0), hs[0][1]
246 
247  def _load_histogram(self, cryo, run, valname, whenname, hist_dict):
248  for level in [cryo, run, valname]:
249  if level not in hist_dict:
250  hist_dict[level] = {}
251  hist_dict = hist_dict[level]
252 
253  if whenname not in hist_dict:
254  return None
255  else:
256  return hist_dict[whenname]
257 
258 
259 
def __getstate__
Definition: glob.py:23
def _process
Definition: glob.py:79
auto enumerate(Iterables &&...iterables)
Range-for loop helper tracking the number of iteration.
Definition: enumerate.h:69
def _makedf
Definition: glob.py:33
def _load_histogram
Definition: glob.py:247
def _loaddf
Definition: glob.py:56
def __setstate__
Definition: glob.py:26
auto zip(Iterables &&...iterables)
Range-for loop helper iterating across many collections at the same time.
Definition: zip.h:295
def _do_process
Definition: glob.py:84
list
Definition: file_to_url.sh:28