26 #include "art/Framework/Core/ReplicatedProducer.h" 
   27 #include "art/Framework/Principal/Event.h" 
   28 #include "art/Framework/Services/Registry/ServiceHandle.h" 
   29 #include "art_root_io/TFileService.h" 
   30 #include "art/Framework/Core/ModuleMacros.h" 
   31 #include "art/Utilities/make_tool.h" 
   32 #include "canvas/Persistency/Common/Ptr.h" 
   33 #include "messagefacility/MessageLogger/MessageLogger.h" 
   34 #include "cetlib/cpu_timer.h" 
   36 #include "tbb/parallel_for.h" 
   37 #include "tbb/blocked_range.h" 
   38 #include "tbb/task_arena.h" 
   39 #include "tbb/spin_mutex.h" 
   40 #include "tbb/concurrent_vector.h" 
   47 #include "sbndaq-artdaq-core/Overlays/ICARUS/PhysCrateFragment.hh" 
   51 #include "icarus_signal_processing/ICARUSSigProcDefs.h" 
   52 #include "icarus_signal_processing/WaveformTools.h" 
   62     explicit DaqDecoderICARUSTPC(fhicl::ParameterSet 
const & pset, art::ProcessingFrame 
const& frame);
 
   66     virtual void configure(fhicl::ParameterSet 
const & pset);
 
   67     virtual void produce(art::Event & 
e, art::ProcessingFrame 
const& frame);
 
   68     virtual void beginJob(art::ProcessingFrame 
const& frame);
 
   69     virtual void endJob(art::ProcessingFrame 
const& frame);
 
   88                                       art::Handle<artdaq::Fragments>&    fragmentsHandle,
 
  100         void operator()(
const tbb::blocked_range<size_t>& range)
 const 
  102             for (
size_t idx = range.begin(); idx < range.end(); idx++)
 
  115     void saveRawDigits(
const icarus_signal_processing::ArrayFloat&, 
 
  116                        const icarus_signal_processing::VectorFloat&, 
 
  117                        const icarus_signal_processing::VectorFloat&,
 
  118                        const icarus_signal_processing::VectorInt&,
 
  152                       art::ReplicatedProducer(pset, frame),
 
  155     fGeometry = lar::providerFrom<geo::Geometry>();
 
  160     int max_concurrency = tbb::this_task_arena::max_concurrency();
 
  162     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"     ==> concurrency: " << max_concurrency << std::endl;
 
  165     const fhicl::ParameterSet& decoderToolParams = pset.get<fhicl::ParameterSet>(
"DecoderTool");
 
  167     fDecoderToolVec.resize(max_concurrency);
 
  169     for(
auto& decoderTool : fDecoderToolVec)
 
  172         decoderTool = art::make_tool<IDecoderFilter>(decoderToolParams);
 
  177     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"ICARUS has " << fGeometry->Nchannels() << 
" in total with " << fGeometry->Views().size() << 
" views" << std::endl;
 
  181     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"WireID: " << wireID << std::endl;
 
  185     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"From geo, first WireID: " << firstWireID << std::endl;
 
  189     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"Channel: " << channel << std::endl;
 
  191     for(
size_t thePlane = 0; thePlane < 3; thePlane++)
 
  196         mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"thePlane: " << thePlane << 
", WireID: " << tempWireID << 
", channel: " << 
 
  197         fGeometry->PlaneWireToChannel(tempWireID) << 
", view: " << fGeometry->View(tempPlaneID) << std::endl;
 
  200     fFragmentOffset = channel / 576;
 
  205     for(
const auto& fragmentLabel : fFragmentsLabelVec)
 
  207         produces<std::vector<raw::RawDigit>>(fragmentLabel.instance());
 
  209         if (fOutputRawWaveform)
 
  210             produces<std::vector<raw::RawDigit>>(fragmentLabel.instance() + fOutputRawWavePath);
 
  212         if (fOutputCorrection)
 
  213             produces<std::vector<raw::RawDigit>>(fragmentLabel.instance() + fOutputCoherentPath);
 
  217     mf::LogInfo(
"DaqDecoderICARUSTPC") << 
"DaqDecoderICARUSTPC configured\n";
 
  234     fFragmentsLabelVec  = pset.get<std::vector<art::InputTag>>(
"FragmentsLabelVec",  std::vector<art::InputTag>()={
"daq:PHYSCRATEDATA"});
 
  263     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"**** Processing raw data fragments ****" << std::endl;
 
  266     int max_concurrency = tbb::this_task_arena::max_concurrency();
 
  268     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"     ==> concurrency: " << max_concurrency << std::endl;
 
  271     std::cout << 
"------------------------------------------------------------------------------------------" << std::endl;
 
  272     std::cout << 
"===> Run: " << 
event.id().run() << 
", subrn: " << 
event.id().subRun() << 
", event: " << 
event.id().event() << std::endl;
 
  274     cet::cpu_timer theClockTotal;
 
  276     theClockTotal.start();
 
  283         art::Handle<artdaq::Fragments> daq_handle;
 
  284         event.getByLabel(fragmentLabel, daq_handle);
 
  291         auto const clockData = art::ServiceHandle<detinfo::DetectorClocksService>()->DataFor(event);
 
  296                                                          concurrentRawRawDigits,
 
  299         tbb::parallel_for(tbb::blocked_range<size_t>(0, daq_handle->size()), fragmentProcessing);
 
  302         RawDigitCollectionPtr rawDigitCollection = std::make_unique<std::vector<raw::RawDigit>>(std::move_iterator(concurrentRawDigits.begin()), 
 
  303                                                                                                 std::move_iterator(concurrentRawDigits.end()));
 
  306         std::sort(rawDigitCollection->begin(),rawDigitCollection->end(),[](
const auto& 
left,
const auto&
right){
return left.Channel() < 
right.Channel();});
 
  309         event.put(std::move(rawDigitCollection), fragmentLabel.instance());
 
  314             RawDigitCollectionPtr rawRawDigitCollection = std::make_unique<std::vector<raw::RawDigit>>(std::move_iterator(concurrentRawRawDigits.begin()), 
 
  315                                                                                                        std::move_iterator(concurrentRawRawDigits.end()));
 
  318             std::sort(rawRawDigitCollection->begin(),rawRawDigitCollection->end(),[](
const auto& 
left,
const auto&
right){
return left.Channel() < 
right.Channel();});
 
  321             event.put(std::move(rawRawDigitCollection),fragmentLabel.instance() + 
fOutputRawWavePath);
 
  327             RawDigitCollectionPtr coherentCollection = std::make_unique<std::vector<raw::RawDigit>>(std::move_iterator(coherentRawDigits.begin()), 
 
  328                                                                                                     std::move_iterator(coherentRawDigits.end()));
 
  331             std::sort(coherentCollection->begin(),coherentCollection->end(),[](
const auto& 
left,
const auto&
right){
return left.Channel() < 
right.Channel();});
 
  338     theClockTotal.stop();
 
  340     double totalTime = theClockTotal.accumulated_real_time();
 
  342     mf::LogInfo(
"DaqDecoderICARUSTPC") << 
"==> DaqDecoderICARUSTPC total time: " << totalTime << std::endl;
 
  349                                                 art::Handle<artdaq::Fragments>     fragmentHandle,
 
  354     cet::cpu_timer theClockProcess;
 
  356     theClockProcess.start();
 
  358     art::Ptr<artdaq::Fragment> fragmentPtr(fragmentHandle, idx);
 
  360     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"--> Processing fragment ID: " << fragmentPtr->fragmentID() << std::endl;
 
  361     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"    ==> Current thread index: " << tbb::this_task_arena::current_thread_index() << std::endl;
 
  371     icarus::PhysCrateFragment physCrateFragment(*fragmentPtr);
 
  379     theClockProcess.stop();
 
  381     double totalTime = theClockProcess.accumulated_real_time();
 
  384     icarus_signal_processing::WaveformTools<float> waveformTools;
 
  387     icarus_signal_processing::VectorFloat       locPedsVec(decoderTool->
getWaveLessCoherent().size(),0.);
 
  388     icarus_signal_processing::VectorFloat       locFullRMSVec(locPedsVec.size(),0.);
 
  389     icarus_signal_processing::VectorFloat       locTruncRMSVec(locPedsVec.size(),0.);
 
  390     icarus_signal_processing::VectorInt         locNumTruncBins(locPedsVec.size(),0);
 
  391     icarus_signal_processing::VectorInt         locRangeBins(locPedsVec.size(),0);
 
  393     const icarus_signal_processing::VectorInt&  channelVec   = decoderTool->
getChannelIDs();
 
  394     const icarus_signal_processing::ArrayFloat& corWaveforms = decoderTool->
getWaveLessCoherent();
 
  396     icarus_signal_processing::ArrayFloat        pedCorWaveforms(corWaveforms.size(),icarus_signal_processing::VectorFloat(corWaveforms[0].
size()));
 
  398     for(
size_t idx = 0; idx < corWaveforms.size(); idx++)
 
  401         waveformTools.getPedestalCorrectedWaveform(corWaveforms[idx],
 
  402                                                    pedCorWaveforms[idx],
 
  407                                                    locNumTruncBins[idx],
 
  411     saveRawDigits(pedCorWaveforms, locPedsVec, locTruncRMSVec, channelVec, rawDigitCollection);
 
  422     mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"--> Exiting fragment processing for thread: " << tbb::this_task_arena::current_thread_index() << 
", time: " << totalTime << std::endl;
 
  427                                         const icarus_signal_processing::VectorFloat& pedestalVec,
 
  428                                         const icarus_signal_processing::VectorFloat& rmsVec,
 
  429                                         const icarus_signal_processing::VectorInt&   channelVec,
 
  432     if (!dataArray.empty())
 
  434         cet::cpu_timer theClockSave;
 
  436         theClockSave.start();
 
  440         mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"    --> saving rawdigits for " << dataArray.size() << 
" channels" << std::endl;
 
  443         for(
size_t chanIdx = 0; chanIdx != dataArray.size(); chanIdx++)
 
  446             if (channelVec[chanIdx] < 0) 
continue;
 
  448             const icarus_signal_processing::VectorFloat& dataVec = dataArray[chanIdx];
 
  451             std::transform(dataVec.begin(),dataVec.end(),wvfm.begin(),[](
const auto& val){
return short(std::round(val));});
 
  453             ConcurrentRawDigitCol::iterator newObjItr = rawDigitCol.emplace_back(channelVec[chanIdx],wvfm.size(),wvfm); 
 
  454             newObjItr->SetPedestal(pedestalVec[chanIdx],rmsVec[chanIdx]);
 
  459         double totalTime = theClockSave.accumulated_real_time();
 
  461         mf::LogDebug(
"DaqDecoderICARUSTPC") << 
"    --> done with save, time: " << totalTime << std::endl;
 
  471     mf::LogInfo(
"DaqDecoderICARUSTPC") << 
"Looked at " << 
fNumEvent << 
" events" << std::endl;
 
DaqDecoderICARUSTPC(fhicl::ParameterSet const &pset, art::ProcessingFrame const &frame)
ConcurrentRawDigitCol & fCoherentCollection
virtual void configure(fhicl::ParameterSet const &pset)
Utilities related to art service access. 
tbb::concurrent_vector< raw::RawDigit > ConcurrentRawDigitCol
virtual const icarus_signal_processing::VectorInt getChannelIDs() const =0
Recover the channels for the processed fragment. 
std::vector< std::unique_ptr< IDecoderFilter > > fDecoderToolVec
Decoder tools. 
float fSigmaForTruncation
This determines the point at which we truncate bins for the RMS calc. 
bool fOutputCorrection
Should we output the coherent noise correction vectors? 
void processSingleFragment(size_t, detinfo::DetectorClocksData const &clockData, art::Handle< artdaq::Fragments >, ConcurrentRawDigitCol &, ConcurrentRawDigitCol &, ConcurrentRawDigitCol &) const 
ConcurrentRawDigitCol & fRawRawDigitCollection
The data type to uniquely identify a Plane. 
std::string fOutputCoherentPath
Path to assign to the output if asked for. 
art::Handle< artdaq::Fragments > & fFragmentsHandle
std::vector< short > ADCvector_t
Type representing a (compressed) vector of ADC counts. 
IDecoderFilter interface class definiton. 
Definition of basic raw digits. 
std::size_t size(FixedBins< T, C > const &) noexcept
int fNumEvent
Number of events seen. 
virtual ~DaqDecoderICARUSTPC()
Destructor. 
bool fOutputRawWaveform
Should we output pedestal corrected (not noise filtered)? 
multiThreadFragmentProcessing(DaqDecoderICARUSTPC const &parent, detinfo::DetectorClocksData const &clockData, art::Handle< artdaq::Fragments > &fragmentsHandle, ConcurrentRawDigitCol &rawDigitCollection, ConcurrentRawDigitCol &rawRawDigitCollection, ConcurrentRawDigitCol &coherentCollection)
unsigned int fPlaneToSimulate
Use to get fragment offset. 
virtual const icarus_signal_processing::ArrayFloat getWaveLessCoherent() const =0
Recover the waveforms less coherent noise. 
virtual void beginJob(art::ProcessingFrame const &frame)
Begin job method. 
This provides an art tool interface definition for tools which "decode" artdaq fragments into LArSoft...
std::vector< raw::RawDigit > RawDigitCollection
const DaqDecoderICARUSTPC & fDaqDecoderICARUSTPC
virtual void process_fragment(detinfo::DetectorClocksData const &clockData, const artdaq::Fragment &fragment)=0
Given a set of recob hits, run DBscan to form 3D clusters. 
std::unique_ptr< RawDigitCollection > RawDigitCollectionPtr
geo::GeometryCore const * fGeometry
pointer to Geometry service 
virtual const icarus_signal_processing::ArrayFloat getCorrectedMedians() const =0
Recover the correction median values. 
detinfo::DetectorClocksData const & fClockData
size_t fFragmentOffset
The fragment offset to set channel numbering. 
void operator()(const tbb::blocked_range< size_t > &range) const 
ConcurrentRawDigitCol & fRawDigitCollection
Description of geometry of one entire detector. 
virtual void endJob(art::ProcessingFrame const &frame)
End job method. 
virtual void produce(art::Event &e, art::ProcessingFrame const &frame)
void saveRawDigits(const icarus_signal_processing::ArrayFloat &, const icarus_signal_processing::VectorFloat &, const icarus_signal_processing::VectorFloat &, const icarus_signal_processing::VectorInt &, ConcurrentRawDigitCol &) const 
std::vector< art::InputTag > fFragmentsLabelVec
The input artdaq fragment label vector (for more than one) 
Contains all timing reference information for the detector. 
virtual const icarus_signal_processing::VectorFloat getFullRMSVals() const =0
Recover the full RMS before coherent noise. 
virtual const icarus_signal_processing::VectorFloat getPedestalVals() const =0
Recover the pedestals for each channel. 
virtual const icarus_signal_processing::ArrayFloat getRawWaveforms() const =0
Recover the original raw waveforms. 
constexpr PlaneID const & planeID() const 
unsigned int ChannelID_t
Type representing the ID of a readout channel. 
std::string fOutputRawWavePath
Path to assign to the output if asked for. 
art framework interface to geometry description 
BEGIN_PROLOG could also be cout