123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- #include "parallel.h"
-
- #include <thread>
- #include <condition_variable>
- #include <memory>
- #include <vector>
-
- namespace parallel {
-
- struct Worker {
- bool running;
- std::function<void(void)> func;
- std::condition_variable cond;
-
- std::thread thread;
- Worker *next_worker;
- };
-
- static std::condition_variable workers_cond;
- static std::mutex workers_mut;
- static std::unique_ptr<Worker[]> workers;
- static Worker *head = nullptr;
- static int threads_running = 0;
- static int max_threads = 0;
- static int num_threads = 0;
- static int waiting = 0;
-
- static void workerFunc(Worker *w) {
- while (true) {
- std::unique_lock<std::mutex> lock(workers_mut);
- w->cond.wait(lock, [w] { return (bool)w->func || !w->running; });
- if (!w->running)
- break;
-
- lock.unlock();
- w->func();
- lock.lock();
-
- w->func = nullptr;
- w->next_worker = head;
- head = w;
-
- threads_running -= 1;
- lock.unlock();
- workers_cond.notify_one();
- }
- }
-
- ParallelContext init(int num) {
- std::unique_lock<std::mutex> lock(workers_mut);
- if (num <= 1) {
- return ParallelContext{};
- }
-
- max_threads = num;
- workers.reset(new Worker[num]);
-
- // Don't spawn threads; they're spawned as needed
- return ParallelContext{};
- }
-
- ParallelContext::~ParallelContext() {
- std::unique_lock<std::mutex> lock(workers_mut);
- workers_cond.wait(lock, [] { return threads_running == 0 && waiting == 0; });
-
- Worker *w = head;
- while (w) {
- w->running = false;
- lock.unlock();
- w->cond.notify_one();
- w->thread.join();
- lock.lock();
- w = w->next_worker;
- }
-
- workers.reset();
- head = nullptr;
- num_threads = 0;
- max_threads = 0;
- }
-
- void run(std::function<void(void)> func) {
- std::unique_lock<std::mutex> lock(workers_mut);
- if (!workers) {
- func();
- return;
- }
-
- if (head == nullptr && num_threads < max_threads) {
- Worker *w = &workers[num_threads++];
- w->next_worker = head;
- head = w;
-
- w->running = true;
- w->thread = std::thread(workerFunc, w);
- }
-
- waiting += 1;
- workers_cond.wait(lock, [] { return head != nullptr; });
- Worker *w = head;
- head = head->next_worker;
- threads_running += 1;
- waiting -= 1;
-
- // Launch!
- w->func = std::move(func);
- lock.unlock();
- w->cond.notify_one();
- }
-
- int coreCount() {
- return std::thread::hardware_concurrency();
- }
-
- }
|