ThreadPool.h 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #ifndef THREAD_POOL_H
  2. #define THREAD_POOL_H
  3. #include <vector>
  4. #include <queue>
  5. #include <memory>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <future>
  10. #include <functional>
  11. #include <stdexcept>
  12. class ThreadPool
  13. {
  14. public:
  15. ThreadPool(size_t);
  16. template<class F, class... Args>
  17. auto enqueue(F&& f, Args&&... args)
  18. ->std::future<typename std::result_of<F(Args...)>::type>;
  19. ~ThreadPool();
  20. size_t getThreadNum()const {
  21. return workers.size();
  22. }
  23. private:
  24. // need to keep track of threads so we can join them
  25. std::vector< std::thread > workers;
  26. // the task queue
  27. std::queue< std::function<void()> > tasks;
  28. // synchronization
  29. std::mutex queue_mutex;
  30. std::condition_variable condition;
  31. bool stop;
  32. };
  33. // the constructor just launches some amount of workers
  34. inline ThreadPool::ThreadPool(size_t threads) :stop(false)
  35. {
  36. for (size_t i = 0;i < threads;++i)
  37. {
  38. workers.emplace_back([this] {
  39. for (;;)
  40. {
  41. std::function<void()> task;
  42. {
  43. std::unique_lock<std::mutex> lock(this->queue_mutex);
  44. this->condition.wait(lock,
  45. [this] { return this->stop || !this->tasks.empty(); });
  46. if (this->stop && this->tasks.empty())
  47. return;
  48. task = std::move(this->tasks.front());
  49. this->tasks.pop();
  50. }
  51. task();
  52. }
  53. });
  54. }
  55. }
  56. // add new work item to the pool
  57. template<class F, class... Args>
  58. auto ThreadPool::enqueue(F&& f, Args&&... args)
  59. -> std::future<typename std::result_of<F(Args...)>::type>
  60. {
  61. using return_type = typename std::result_of<F(Args...)>::type;
  62. auto task = std::make_shared< std::packaged_task<return_type()> >(
  63. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  64. );
  65. std::future<return_type> res = task->get_future();
  66. {
  67. std::unique_lock<std::mutex> lock(queue_mutex);
  68. // don't allow enqueueing after stopping the pool
  69. if (stop)
  70. throw std::runtime_error("enqueue on stopped ThreadPool");
  71. tasks.emplace([task]() { (*task)(); });
  72. }
  73. condition.notify_one();
  74. return res;
  75. }
  76. // the destructor joins all threads
  77. inline ThreadPool::~ThreadPool()
  78. {
  79. {
  80. std::unique_lock<std::mutex> lock(queue_mutex);
  81. stop = true;
  82. }
  83. condition.notify_all();
  84. for (std::thread& worker : workers)
  85. worker.join();
  86. }
  87. #endif