Eclipse SUMO - Simulation of Urban MObility
FXWorkerThread.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2004-2019 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials
5 // are made available under the terms of the Eclipse Public License v2.0
6 // which accompanies this distribution, and is available at
7 // http://www.eclipse.org/legal/epl-v20.html
8 // SPDX-License-Identifier: EPL-2.0
9 /****************************************************************************/
15 // A thread class together with a pool and a task for parallelized computation
16 /****************************************************************************/
17 
18 #ifndef FXWorkerThread_h
19 #define FXWorkerThread_h
20 
21 // #define WORKLOAD_PROFILING
22 // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
23 // undefine to use summary report only
24 #define WORKLOAD_INTERVAL 100
25 
26 // ===========================================================================
27 // included modules
28 // ===========================================================================
29 #include <config.h>
30 
31 #include <list>
32 #include <vector>
33 #include <fx.h>
34 #ifdef WORKLOAD_PROFILING
35 #include <chrono>
37 #include <utils/common/ToString.h>
38 #endif
40 
41 
42 // ===========================================================================
43 // class definitions
44 // ===========================================================================
49 class FXWorkerThread : public FXThread {
50 
51 public:
56  class Task {
57  public:
59  virtual ~Task() {};
60 
69  virtual void run(FXWorkerThread* context) = 0;
70 
77  void setIndex(const int newIndex) {
78  myIndex = newIndex;
79  }
80  private:
82  int myIndex;
83  };
84 
89  class Pool {
90  public:
97  Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
98 #ifdef WORKLOAD_PROFILING
99  , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
100 #endif
101  {
102 #ifdef WORKLOAD_PROFILING
103  long long int timeDiff = 0;
104  for (int i = 0; i < 100; i++) {
105  const auto begin = std::chrono::high_resolution_clock::now();
106  const auto end = std::chrono::high_resolution_clock::now();
107  timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
108  }
109  //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
110 #endif
111  while (numThreads > 0) {
112  new FXWorkerThread(*this);
113  numThreads--;
114  }
115  }
116 
121  virtual ~Pool() {
122  clear();
123  }
124 
127  void clear() {
128  for (FXWorkerThread* const worker : myWorkers) {
129  delete worker;
130  }
131  myWorkers.clear();
132  }
133 
138  void addWorker(FXWorkerThread* const w) {
139  myWorkers.push_back(w);
140  }
141 
148  void add(Task* const t, int index = -1) {
149  if (index < 0) {
150  index = myRunningIndex % myWorkers.size();
151  }
152 #ifdef WORKLOAD_PROFILING
153  if (myRunningIndex == 0) {
154  for (FXWorkerThread* const worker : myWorkers) {
155  worker->startProfile();
156  }
157  myProfileStart = std::chrono::high_resolution_clock::now();
158  }
159 #endif
160  t->setIndex(myRunningIndex++);
161  myWorkers[index]->add(t);
162  }
163 
170  void addFinished(std::list<Task*>& tasks) {
171  myMutex.lock();
172  myFinishedTasks.splice(myFinishedTasks.end(), tasks);
173  myCondition.signal();
174  myMutex.unlock();
175  }
176 
178  myMutex.lock();
179  if (myException == nullptr) {
180  myException = new ProcessError(e);
181  }
182  myMutex.unlock();
183  }
184 
186  void waitAll(const bool deleteFinished = true) {
187  myMutex.lock();
188  while ((int)myFinishedTasks.size() < myRunningIndex) {
189  myCondition.wait(myMutex);
190  }
191 #ifdef WORKLOAD_PROFILING
192  if (myRunningIndex > 0) {
193  const auto end = std::chrono::high_resolution_clock::now();
194  const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
195  double minLoad = std::numeric_limits<double>::max();
196  double maxLoad = 0.;
197  for (FXWorkerThread* const worker : myWorkers) {
198  const double load = worker->endProfile(elapsed);
199  minLoad = MIN2(minLoad, load);
200  maxLoad = MAX2(maxLoad, load);
201  }
202 #ifdef WORKLOAD_INTERVAL
203  myTotalMaxLoad += maxLoad;
204  myTotalSpread += maxLoad / minLoad;
205  myNumBatches++;
206  if (myNumBatches % WORKLOAD_INTERVAL == 0) {
207  WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
208  myTotalMaxLoad = 0.;
209  myTotalSpread = 0.;
210  }
211 #endif
212  }
213 #endif
214  if (deleteFinished) {
215  for (Task* task : myFinishedTasks) {
216  delete task;
217  }
218  }
219  ProcessError* toRaise = myException;
220  myException = nullptr;
221  myFinishedTasks.clear();
222  myRunningIndex = 0;
223  myMutex.unlock();
224  if (toRaise != nullptr) {
225  throw* toRaise;
226  }
227  }
228 
236  bool isFull() const {
237  return myRunningIndex - (int)myFinishedTasks.size() >= size();
238  }
239 
244  int size() const {
245  return (int)myWorkers.size();
246  }
247 
249  void lock() {
250  myPoolMutex.lock();
251  }
252 
254  void unlock() {
255  myPoolMutex.unlock();
256  }
257 
258  private:
260  std::vector<FXWorkerThread*> myWorkers;
262  FXMutex myMutex;
264  FXMutex myPoolMutex;
266  FXCondition myCondition;
268  std::list<Task*> myFinishedTasks;
273 #ifdef WORKLOAD_PROFILING
274  int myNumBatches;
277  double myTotalMaxLoad;
279  double myTotalSpread;
281  std::chrono::high_resolution_clock::time_point myProfileStart;
282 #endif
283  };
284 
285 public:
292  FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
293 #ifdef WORKLOAD_PROFILING
294  , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
295 #endif
296  {
297  pool.addWorker(this);
298  start();
299  }
300 
305  virtual ~FXWorkerThread() {
306  stop();
307 #ifdef WORKLOAD_PROFILING
308  const double load = 100. * myTotalBusyTime / myTotalTime;
309  WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
310  " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
311  "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
312 #endif
313  }
314 
319  void add(Task* t) {
320  myMutex.lock();
321  myTasks.push_back(t);
322  myCondition.signal();
323  myMutex.unlock();
324  }
325 
332  FXint run() {
333  while (!myStopped) {
334  myMutex.lock();
335  while (!myStopped && myTasks.empty()) {
336  myCondition.wait(myMutex);
337  }
338  if (myStopped) {
339  myMutex.unlock();
340  break;
341  }
342  myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
343  myMutex.unlock();
344  try {
345  for (Task* const t : myCurrentTasks) {
346 #ifdef WORKLOAD_PROFILING
347  const auto before = std::chrono::high_resolution_clock::now();
348 #endif
349  t->run(this);
350 #ifdef WORKLOAD_PROFILING
351  const auto after = std::chrono::high_resolution_clock::now();
352  myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
353  myCounter++;
354 #endif
355  }
356  } catch (ProcessError& e) {
357  myPool.setException(e);
358  }
360  }
361  return 0;
362  }
363 
368  void stop() {
369  myMutex.lock();
370  myStopped = true;
371  myCondition.signal();
372  myMutex.unlock();
373  join();
374  }
375 
376 #ifdef WORKLOAD_PROFILING
377  void startProfile() {
378  myBusyTime = 0;
379  }
380 
381  double endProfile(const long long int time) {
382  myTotalTime += time;
383  myTotalBusyTime += myBusyTime;
384  return time == 0 ? 100. : 100. * myBusyTime / time;
385  }
386 #endif
387 
388 private:
392  FXMutex myMutex;
394  FXCondition myCondition;
396  std::list<Task*> myTasks;
398  std::list<Task*> myCurrentTasks;
400  bool myStopped;
401 #ifdef WORKLOAD_PROFILING
402  int myCounter;
405  long long int myBusyTime;
407  long long int myTotalBusyTime;
409  long long int myTotalTime;
410 #endif
411 };
412 
413 
414 #endif
FXWorkerThread::myCurrentTasks
std::list< Task * > myCurrentTasks
the list of tasks which are currently calculated
Definition: FXWorkerThread.h:398
FXWorkerThread::Pool::myRunningIndex
int myRunningIndex
the running index for the next task
Definition: FXWorkerThread.h:270
ToString.h
MIN2
T MIN2(T a, T b)
Definition: StdDefs.h:74
FXWorkerThread::Pool::myMutex
FXMutex myMutex
the internal mutex for the task list
Definition: FXWorkerThread.h:262
FXWorkerThread::Pool::myFinishedTasks
std::list< Task * > myFinishedTasks
list of finished tasks
Definition: FXWorkerThread.h:268
MsgHandler.h
FXWorkerThread::myPool
Pool & myPool
the pool for this thread
Definition: FXWorkerThread.h:390
FXWorkerThread::Pool::~Pool
virtual ~Pool()
Destructor.
Definition: FXWorkerThread.h:121
FXWorkerThread::stop
void stop()
Stops the thread.
Definition: FXWorkerThread.h:368
FXWorkerThread::Pool::size
int size() const
Returns the number of threads in the pool.
Definition: FXWorkerThread.h:244
MAX2
T MAX2(T a, T b)
Definition: StdDefs.h:80
FXWorkerThread::Pool::waitAll
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
Definition: FXWorkerThread.h:186
FXWorkerThread::Pool::addFinished
void addFinished(std::list< Task * > &tasks)
Adds the given tasks to the list of finished tasks.
Definition: FXWorkerThread.h:170
FXWorkerThread::Pool::isFull
bool isFull() const
Checks whether there are currently more pending tasks than threads.
Definition: FXWorkerThread.h:236
FXWorkerThread::Pool::addWorker
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
Definition: FXWorkerThread.h:138
FXWorkerThread::myStopped
bool myStopped
whether we are still running
Definition: FXWorkerThread.h:400
FXWorkerThread::run
FXint run()
Main execution method of this thread.
Definition: FXWorkerThread.h:332
FXWorkerThread::Pool::unlock
void unlock()
unlocks the pool mutex
Definition: FXWorkerThread.h:254
ProcessError
Definition: UtilExceptions.h:40
FXWorkerThread::myCondition
FXCondition myCondition
the semaphore when waiting for new tasks
Definition: FXWorkerThread.h:394
UtilExceptions.h
FXWorkerThread::Pool::lock
void lock()
locks the pool mutex
Definition: FXWorkerThread.h:249
FXWorkerThread::Pool::Pool
Pool(int numThreads=0)
Constructor.
Definition: FXWorkerThread.h:97
FXWorkerThread::~FXWorkerThread
virtual ~FXWorkerThread()
Destructor.
Definition: FXWorkerThread.h:305
FXWorkerThread::Task::run
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
FXWorkerThread::Task::myIndex
int myIndex
the index of the task, valid only after the task has been added to the pool
Definition: FXWorkerThread.h:82
FXWorkerThread::FXWorkerThread
FXWorkerThread(Pool &pool)
Constructor.
Definition: FXWorkerThread.h:292
toString
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition: ToString.h:48
FXWorkerThread::Task::~Task
virtual ~Task()
Desctructor.
Definition: FXWorkerThread.h:59
FXWorkerThread::myMutex
FXMutex myMutex
the mutex for the task list
Definition: FXWorkerThread.h:392
FXWorkerThread::Pool
A pool of worker threads which distributes the tasks and collects the results.
Definition: FXWorkerThread.h:89
FXWorkerThread::Pool::setException
void setException(ProcessError &e)
Definition: FXWorkerThread.h:177
WORKLOAD_INTERVAL
#define WORKLOAD_INTERVAL
Definition: FXWorkerThread.h:24
FXWorkerThread::Pool::add
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index....
Definition: FXWorkerThread.h:148
FXWorkerThread::Pool::myCondition
FXCondition myCondition
the semaphore to wait on for finishing all tasks
Definition: FXWorkerThread.h:266
FXWorkerThread::add
void add(Task *t)
Adds the given task to this thread to be calculated.
Definition: FXWorkerThread.h:319
config.h
FXWorkerThread::Pool::clear
void clear()
Stops and deletes all worker threads.
Definition: FXWorkerThread.h:127
FXWorkerThread::Pool::myWorkers
std::vector< FXWorkerThread * > myWorkers
the current worker threads
Definition: FXWorkerThread.h:260
FXWorkerThread::Task
Abstract superclass of a task to be run with an index to keep track of pending tasks.
Definition: FXWorkerThread.h:56
FXWorkerThread::Pool::myException
ProcessError * myException
the exception from a child thread
Definition: FXWorkerThread.h:272
FXWorkerThread::Pool::myPoolMutex
FXMutex myPoolMutex
the pool mutex for external sync
Definition: FXWorkerThread.h:264
FXWorkerThread::myTasks
std::list< Task * > myTasks
the list of pending tasks
Definition: FXWorkerThread.h:396
FXWorkerThread::Task::setIndex
void setIndex(const int newIndex)
Sets the running index of this task.
Definition: FXWorkerThread.h:77
WRITE_MESSAGE
#define WRITE_MESSAGE(msg)
Definition: MsgHandler.h:240
FXWorkerThread
A thread repeatingly calculating incoming tasks.
Definition: FXWorkerThread.h:49