26 #include "art/Framework/Core/ReplicatedProducer.h"
27 #include "art/Framework/Principal/Event.h"
28 #include "art/Framework/Services/Registry/ServiceHandle.h"
29 #include "art_root_io/TFileService.h"
30 #include "art/Framework/Core/ModuleMacros.h"
31 #include "art/Utilities/make_tool.h"
32 #include "canvas/Persistency/Common/Ptr.h"
33 #include "messagefacility/MessageLogger/MessageLogger.h"
34 #include "cetlib/cpu_timer.h"
36 #include "tbb/parallel_for.h"
37 #include "tbb/blocked_range.h"
38 #include "tbb/task_arena.h"
39 #include "tbb/spin_mutex.h"
40 #include "tbb/concurrent_vector.h"
47 #include "sbndaq-artdaq-core/Overlays/ICARUS/PhysCrateFragment.hh"
51 #include "icarus_signal_processing/ICARUSSigProcDefs.h"
52 #include "icarus_signal_processing/WaveformTools.h"
62 explicit DaqDecoderICARUSTPC(fhicl::ParameterSet
const & pset, art::ProcessingFrame
const& frame);
66 virtual void configure(fhicl::ParameterSet
const & pset);
67 virtual void produce(art::Event &
e, art::ProcessingFrame
const& frame);
68 virtual void beginJob(art::ProcessingFrame
const& frame);
69 virtual void endJob(art::ProcessingFrame
const& frame);
88 art::Handle<artdaq::Fragments>& fragmentsHandle,
100 void operator()(
const tbb::blocked_range<size_t>& range)
const
102 for (
size_t idx = range.begin(); idx < range.end(); idx++)
115 void saveRawDigits(
const icarus_signal_processing::ArrayFloat&,
116 const icarus_signal_processing::VectorFloat&,
117 const icarus_signal_processing::VectorFloat&,
118 const icarus_signal_processing::VectorInt&,
152 art::ReplicatedProducer(pset, frame),
155 fGeometry = lar::providerFrom<geo::Geometry>();
160 int max_concurrency = tbb::this_task_arena::max_concurrency();
162 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
" ==> concurrency: " << max_concurrency << std::endl;
165 const fhicl::ParameterSet& decoderToolParams = pset.get<fhicl::ParameterSet>(
"DecoderTool");
167 fDecoderToolVec.resize(max_concurrency);
169 for(
auto& decoderTool : fDecoderToolVec)
172 decoderTool = art::make_tool<IDecoderFilter>(decoderToolParams);
177 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"ICARUS has " << fGeometry->Nchannels() <<
" in total with " << fGeometry->Views().size() <<
" views" << std::endl;
181 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"WireID: " << wireID << std::endl;
185 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"From geo, first WireID: " << firstWireID << std::endl;
189 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"Channel: " << channel << std::endl;
191 for(
size_t thePlane = 0; thePlane < 3; thePlane++)
196 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"thePlane: " << thePlane <<
", WireID: " << tempWireID <<
", channel: " <<
197 fGeometry->PlaneWireToChannel(tempWireID) <<
", view: " << fGeometry->View(tempPlaneID) << std::endl;
200 fFragmentOffset = channel / 576;
205 for(
const auto& fragmentLabel : fFragmentsLabelVec)
207 produces<std::vector<raw::RawDigit>>(fragmentLabel.instance());
209 if (fOutputRawWaveform)
210 produces<std::vector<raw::RawDigit>>(fragmentLabel.instance() + fOutputRawWavePath);
212 if (fOutputCorrection)
213 produces<std::vector<raw::RawDigit>>(fragmentLabel.instance() + fOutputCoherentPath);
217 mf::LogInfo(
"DaqDecoderICARUSTPC") <<
"DaqDecoderICARUSTPC configured\n";
234 fFragmentsLabelVec = pset.get<std::vector<art::InputTag>>(
"FragmentsLabelVec", std::vector<art::InputTag>()={
"daq:PHYSCRATEDATA"});
263 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"**** Processing raw data fragments ****" << std::endl;
266 int max_concurrency = tbb::this_task_arena::max_concurrency();
268 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
" ==> concurrency: " << max_concurrency << std::endl;
271 std::cout <<
"------------------------------------------------------------------------------------------" << std::endl;
272 std::cout <<
"===> Run: " <<
event.id().run() <<
", subrn: " <<
event.id().subRun() <<
", event: " <<
event.id().event() << std::endl;
274 cet::cpu_timer theClockTotal;
276 theClockTotal.start();
283 art::Handle<artdaq::Fragments> daq_handle;
284 event.getByLabel(fragmentLabel, daq_handle);
291 auto const clockData = art::ServiceHandle<detinfo::DetectorClocksService>()->DataFor(event);
296 concurrentRawRawDigits,
299 tbb::parallel_for(tbb::blocked_range<size_t>(0, daq_handle->size()), fragmentProcessing);
302 RawDigitCollectionPtr rawDigitCollection = std::make_unique<std::vector<raw::RawDigit>>(std::move_iterator(concurrentRawDigits.begin()),
303 std::move_iterator(concurrentRawDigits.end()));
306 std::sort(rawDigitCollection->begin(),rawDigitCollection->end(),[](
const auto&
left,
const auto&
right){
return left.Channel() <
right.Channel();});
309 event.put(std::move(rawDigitCollection), fragmentLabel.instance());
314 RawDigitCollectionPtr rawRawDigitCollection = std::make_unique<std::vector<raw::RawDigit>>(std::move_iterator(concurrentRawRawDigits.begin()),
315 std::move_iterator(concurrentRawRawDigits.end()));
318 std::sort(rawRawDigitCollection->begin(),rawRawDigitCollection->end(),[](
const auto&
left,
const auto&
right){
return left.Channel() <
right.Channel();});
321 event.put(std::move(rawRawDigitCollection),fragmentLabel.instance() +
fOutputRawWavePath);
327 RawDigitCollectionPtr coherentCollection = std::make_unique<std::vector<raw::RawDigit>>(std::move_iterator(coherentRawDigits.begin()),
328 std::move_iterator(coherentRawDigits.end()));
331 std::sort(coherentCollection->begin(),coherentCollection->end(),[](
const auto&
left,
const auto&
right){
return left.Channel() <
right.Channel();});
338 theClockTotal.stop();
340 double totalTime = theClockTotal.accumulated_real_time();
342 mf::LogInfo(
"DaqDecoderICARUSTPC") <<
"==> DaqDecoderICARUSTPC total time: " << totalTime << std::endl;
349 art::Handle<artdaq::Fragments> fragmentHandle,
354 cet::cpu_timer theClockProcess;
356 theClockProcess.start();
358 art::Ptr<artdaq::Fragment> fragmentPtr(fragmentHandle, idx);
360 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"--> Processing fragment ID: " << fragmentPtr->fragmentID() << std::endl;
361 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
" ==> Current thread index: " << tbb::this_task_arena::current_thread_index() << std::endl;
371 icarus::PhysCrateFragment physCrateFragment(*fragmentPtr);
379 theClockProcess.stop();
381 double totalTime = theClockProcess.accumulated_real_time();
384 icarus_signal_processing::WaveformTools<float> waveformTools;
387 icarus_signal_processing::VectorFloat locPedsVec(decoderTool->
getWaveLessCoherent().size(),0.);
388 icarus_signal_processing::VectorFloat locFullRMSVec(locPedsVec.size(),0.);
389 icarus_signal_processing::VectorFloat locTruncRMSVec(locPedsVec.size(),0.);
390 icarus_signal_processing::VectorInt locNumTruncBins(locPedsVec.size(),0);
391 icarus_signal_processing::VectorInt locRangeBins(locPedsVec.size(),0);
393 const icarus_signal_processing::VectorInt& channelVec = decoderTool->
getChannelIDs();
394 const icarus_signal_processing::ArrayFloat& corWaveforms = decoderTool->
getWaveLessCoherent();
396 icarus_signal_processing::ArrayFloat pedCorWaveforms(corWaveforms.size(),icarus_signal_processing::VectorFloat(corWaveforms[0].
size()));
398 for(
size_t idx = 0; idx < corWaveforms.size(); idx++)
401 waveformTools.getPedestalCorrectedWaveform(corWaveforms[idx],
402 pedCorWaveforms[idx],
407 locNumTruncBins[idx],
411 saveRawDigits(pedCorWaveforms, locPedsVec, locTruncRMSVec, channelVec, rawDigitCollection);
422 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
"--> Exiting fragment processing for thread: " << tbb::this_task_arena::current_thread_index() <<
", time: " << totalTime << std::endl;
427 const icarus_signal_processing::VectorFloat& pedestalVec,
428 const icarus_signal_processing::VectorFloat& rmsVec,
429 const icarus_signal_processing::VectorInt& channelVec,
432 if (!dataArray.empty())
434 cet::cpu_timer theClockSave;
436 theClockSave.start();
440 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
" --> saving rawdigits for " << dataArray.size() <<
" channels" << std::endl;
443 for(
size_t chanIdx = 0; chanIdx != dataArray.size(); chanIdx++)
446 if (channelVec[chanIdx] < 0)
continue;
448 const icarus_signal_processing::VectorFloat& dataVec = dataArray[chanIdx];
451 std::transform(dataVec.begin(),dataVec.end(),wvfm.begin(),[](
const auto& val){
return short(std::round(val));});
453 ConcurrentRawDigitCol::iterator newObjItr = rawDigitCol.emplace_back(channelVec[chanIdx],wvfm.size(),wvfm);
454 newObjItr->SetPedestal(pedestalVec[chanIdx],rmsVec[chanIdx]);
459 double totalTime = theClockSave.accumulated_real_time();
461 mf::LogDebug(
"DaqDecoderICARUSTPC") <<
" --> done with save, time: " << totalTime << std::endl;
471 mf::LogInfo(
"DaqDecoderICARUSTPC") <<
"Looked at " <<
fNumEvent <<
" events" << std::endl;
DaqDecoderICARUSTPC(fhicl::ParameterSet const &pset, art::ProcessingFrame const &frame)
ConcurrentRawDigitCol & fCoherentCollection
virtual void configure(fhicl::ParameterSet const &pset)
Utilities related to art service access.
tbb::concurrent_vector< raw::RawDigit > ConcurrentRawDigitCol
virtual const icarus_signal_processing::VectorInt getChannelIDs() const =0
Recover the channels for the processed fragment.
std::vector< std::unique_ptr< IDecoderFilter > > fDecoderToolVec
Decoder tools.
float fSigmaForTruncation
This determines the point at which we truncate bins for the RMS calc.
bool fOutputCorrection
Should we output the coherent noise correction vectors?
void processSingleFragment(size_t, detinfo::DetectorClocksData const &clockData, art::Handle< artdaq::Fragments >, ConcurrentRawDigitCol &, ConcurrentRawDigitCol &, ConcurrentRawDigitCol &) const
ConcurrentRawDigitCol & fRawRawDigitCollection
The data type to uniquely identify a Plane.
std::string fOutputCoherentPath
Path to assign to the output if asked for.
art::Handle< artdaq::Fragments > & fFragmentsHandle
std::vector< short > ADCvector_t
Type representing a (compressed) vector of ADC counts.
IDecoderFilter interface class definiton.
Definition of basic raw digits.
std::size_t size(FixedBins< T, C > const &) noexcept
int fNumEvent
Number of events seen.
virtual ~DaqDecoderICARUSTPC()
Destructor.
bool fOutputRawWaveform
Should we output pedestal corrected (not noise filtered)?
multiThreadFragmentProcessing(DaqDecoderICARUSTPC const &parent, detinfo::DetectorClocksData const &clockData, art::Handle< artdaq::Fragments > &fragmentsHandle, ConcurrentRawDigitCol &rawDigitCollection, ConcurrentRawDigitCol &rawRawDigitCollection, ConcurrentRawDigitCol &coherentCollection)
unsigned int fPlaneToSimulate
Use to get fragment offset.
virtual const icarus_signal_processing::ArrayFloat getWaveLessCoherent() const =0
Recover the waveforms less coherent noise.
virtual void beginJob(art::ProcessingFrame const &frame)
Begin job method.
This provides an art tool interface definition for tools which "decode" artdaq fragments into LArSoft...
std::vector< raw::RawDigit > RawDigitCollection
const DaqDecoderICARUSTPC & fDaqDecoderICARUSTPC
virtual void process_fragment(detinfo::DetectorClocksData const &clockData, const artdaq::Fragment &fragment)=0
Given a set of recob hits, run DBscan to form 3D clusters.
std::unique_ptr< RawDigitCollection > RawDigitCollectionPtr
geo::GeometryCore const * fGeometry
pointer to Geometry service
virtual const icarus_signal_processing::ArrayFloat getCorrectedMedians() const =0
Recover the correction median values.
detinfo::DetectorClocksData const & fClockData
size_t fFragmentOffset
The fragment offset to set channel numbering.
void operator()(const tbb::blocked_range< size_t > &range) const
ConcurrentRawDigitCol & fRawDigitCollection
Description of geometry of one entire detector.
virtual void endJob(art::ProcessingFrame const &frame)
End job method.
virtual void produce(art::Event &e, art::ProcessingFrame const &frame)
void saveRawDigits(const icarus_signal_processing::ArrayFloat &, const icarus_signal_processing::VectorFloat &, const icarus_signal_processing::VectorFloat &, const icarus_signal_processing::VectorInt &, ConcurrentRawDigitCol &) const
std::vector< art::InputTag > fFragmentsLabelVec
The input artdaq fragment label vector (for more than one)
Contains all timing reference information for the detector.
virtual const icarus_signal_processing::VectorFloat getFullRMSVals() const =0
Recover the full RMS before coherent noise.
virtual const icarus_signal_processing::VectorFloat getPedestalVals() const =0
Recover the pedestals for each channel.
virtual const icarus_signal_processing::ArrayFloat getRawWaveforms() const =0
Recover the original raw waveforms.
constexpr PlaneID const & planeID() const
unsigned int ChannelID_t
Type representing the ID of a readout channel.
std::string fOutputRawWavePath
Path to assign to the output if asked for.
art framework interface to geometry description
BEGIN_PROLOG could also be cout