All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ThreadPool.cxx
Go to the documentation of this file.
2 
4 
5 #include <cassert>
6 #include <unistd.h>
7 
8 namespace ana
9 {
10  //----------------------------------------------------------------------
11  ThreadPool::ThreadPool(unsigned int maxThreads)
12  : fMaxThreads(maxThreads), fNumLiveThreads(0),
13  fTasksCompleted(0), fTotalTasks(0), fProgress(0)
14  {
15  if(maxThreads == 0){
16  fMaxThreads = sysconf(_SC_NPROCESSORS_ONLN);
17  }
18 
19  pthread_mutex_init(&fTasksLock, 0);
20  pthread_mutex_init(&fThreadsLock, 0);
21  pthread_mutex_init(&fProgressLock, 0);
22  }
23 
24  //----------------------------------------------------------------------
26  {
27  // This is safe, even if it's already been explicitly called
28  Finish();
29 
30  pthread_mutex_destroy(&fTasksLock);
31  pthread_mutex_destroy(&fThreadsLock);
32  pthread_mutex_destroy(&fProgressLock);
33 
34  delete fProgress;
35  }
36 
37  //----------------------------------------------------------------------
38  void ThreadPool::ShowProgress(const std::string& title)
39  {
40  if(!fProgress) fProgress = new Progress(title);
41  }
42 
43  //----------------------------------------------------------------------
45  {
46  void* junk;
47  for(pthread_t& th: fThreads) pthread_join(th, &junk);
48 
49  assert(fNumLiveThreads == 0);
50 
51  fThreads.clear(); // Make it safe to call a second time
52 
53  if(fProgress) fProgress->Done();
54  }
55 
56  //----------------------------------------------------------------------
57  void* ThreadPool::WorkerFunc(void* arg)
58  {
59  // We smuggle essentially the this pointer in through the argument
60  ThreadPool* pool = (ThreadPool*)arg;
61 
62  while(true){
63  pthread_mutex_lock(&pool->fTasksLock);
64  if(pool->fTasks.empty()){
65  // Nothing to do, commit suicide
66  pthread_mutex_unlock(&pool->fTasksLock);
67 
68  pthread_mutex_lock(&pool->fThreadsLock);
69  --pool->fNumLiveThreads;
70  pthread_mutex_unlock(&pool->fThreadsLock);
71 
72  return 0;
73  }
74  // Get a task to do
75  func_t task = pool->fTasks.front();
76  pool->fTasks.pop_front();
77  pthread_mutex_unlock(&pool->fTasksLock);
78 
79  // Actually do the user's work
80  task();
81 
82  pthread_mutex_lock(&pool->fProgressLock);
83  ++pool->fTasksCompleted;
84 
85  if(pool->fProgress)
86  pool->fProgress->SetProgress(pool->fTasksCompleted/double(pool->fTotalTasks));
87  pthread_mutex_unlock(&pool->fProgressLock);
88  }
89  }
90 
91  //----------------------------------------------------------------------
93  {
94  pthread_mutex_lock(&fTasksLock);
95  fTasks.push_back(func);
96  pthread_mutex_unlock(&fTasksLock);
97 
98  pthread_mutex_lock(&fProgressLock);
99  ++fTotalTasks;
100  pthread_mutex_unlock(&fProgressLock);
101 
102  pthread_mutex_lock(&fThreadsLock);
104  fThreads.push_back(pthread_t());
105  pthread_create(&fThreads.back(), 0, WorkerFunc, this);
106  ++fNumLiveThreads;
107  }
108  pthread_mutex_unlock(&fThreadsLock);
109  }
110 }
pthread_mutex_t fThreadsLock
Definition: ThreadPool.h:58
std::vector< pthread_t > fThreads
All threads we ever created.
Definition: ThreadPool.h:59
void Finish()
Wait for all threads to complete before returning.
Definition: ThreadPool.cxx:44
Progress * fProgress
Definition: ThreadPool.h:66
std::function< void(void)> func_t
The type of the user&#39;s worker functions.
Definition: ThreadPool.h:43
void AddTaskHelper(func_t func)
Definition: ThreadPool.cxx:92
process_name opflashCryoW ana
ThreadPool(unsigned int maxThreads=0)
Definition: ThreadPool.cxx:11
int fTotalTasks
How many tasks have we ever seen?
Definition: ThreadPool.h:65
pthread_mutex_t fProgressLock
Protects fTasksCompleted and fTotalTasks too.
Definition: ThreadPool.h:63
virtual ~ThreadPool()
Definition: ThreadPool.cxx:25
std::deque< func_t > fTasks
Actually, this is protecting fNumLiveThreads.
Definition: ThreadPool.h:55
void SetProgress(double frac)
Update the progress fraction between zero and one.
Definition: Progress.cxx:44
static void * WorkerFunc(void *arg)
Definition: ThreadPool.cxx:57
unsigned int fMaxThreads
Definition: ThreadPool.h:50
A very simple thread pool for use by Surface.
Definition: ThreadPool.h:17
A simple ascii-art progress bar.
Definition: Progress.h:9
void ShowProgress(const std::string &title)
Definition: ThreadPool.cxx:38
pthread_mutex_t fTasksLock
Definition: ThreadPool.h:54
void Done()
Call this when action is completed.
Definition: Progress.cxx:91
unsigned int fNumLiveThreads
Number of threads that are running.
Definition: ThreadPool.h:60