Build tool
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

parallel.cc 2.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #include "parallel.h"
  2. #include <thread>
  3. #include <condition_variable>
  4. #include <memory>
  5. #include <vector>
  6. namespace parallel {
  7. struct Worker {
  8. bool running;
  9. std::function<void(void)> func;
  10. std::condition_variable cond;
  11. std::thread thread;
  12. Worker *next_worker;
  13. };
  14. static std::condition_variable workers_cond;
  15. static std::mutex workers_mut;
  16. static std::unique_ptr<Worker[]> workers;
  17. static Worker *head = nullptr;
  18. static int threads_running = 0;
  19. static int max_threads = 0;
  20. static int num_threads = 0;
  21. static int waiting = 0;
  22. static void workerFunc(Worker *w) {
  23. while (true) {
  24. std::unique_lock<std::mutex> lock(workers_mut);
  25. w->cond.wait(lock, [w] { return (bool)w->func || !w->running; });
  26. if (!w->running)
  27. break;
  28. lock.unlock();
  29. w->func();
  30. lock.lock();
  31. w->func = nullptr;
  32. w->next_worker = head;
  33. head = w;
  34. threads_running -= 1;
  35. lock.unlock();
  36. workers_cond.notify_one();
  37. }
  38. }
  39. ParallelContext init(int num) {
  40. std::unique_lock<std::mutex> lock(workers_mut);
  41. if (num <= 1) {
  42. return ParallelContext{};
  43. }
  44. max_threads = num;
  45. workers.reset(new Worker[num]);
  46. // Don't spawn threads; they're spawned as needed
  47. return ParallelContext{};
  48. }
  49. ParallelContext::~ParallelContext() {
  50. std::unique_lock<std::mutex> lock(workers_mut);
  51. workers_cond.wait(lock, [] { return threads_running == 0 && waiting == 0; });
  52. Worker *w = head;
  53. while (w) {
  54. w->running = false;
  55. lock.unlock();
  56. w->cond.notify_one();
  57. w->thread.join();
  58. lock.lock();
  59. w = w->next_worker;
  60. }
  61. workers.reset();
  62. head = nullptr;
  63. num_threads = 0;
  64. max_threads = 0;
  65. }
  66. void run(std::function<void(void)> func) {
  67. std::unique_lock<std::mutex> lock(workers_mut);
  68. if (!workers) {
  69. func();
  70. return;
  71. }
  72. if (head == nullptr && num_threads < max_threads) {
  73. Worker *w = &workers[num_threads++];
  74. w->next_worker = head;
  75. head = w;
  76. w->running = true;
  77. w->thread = std::thread(workerFunc, w);
  78. }
  79. waiting += 1;
  80. workers_cond.wait(lock, [] { return head != nullptr; });
  81. Worker *w = head;
  82. head = head->next_worker;
  83. threads_running += 1;
  84. waiting -= 1;
  85. // Launch!
  86. w->func = std::move(func);
  87. lock.unlock();
  88. w->cond.notify_one();
  89. }
  90. int coreCount() {
  91. return std::thread::hardware_concurrency();
  92. }
  93. }