# BS_thread_pool 源码解读 源码路径: https://github.com/bshoshany/thread-pool/blob/master/tests/BS_thread_pool_test.cpp ## 线程池 线程池(Thread Pool) 是一种并发编程的设计模式,用于管理和复用多个线程,以便高效地执行大量任务。线程池的核心思想是预先创建一组线程,并将任务分配给这些线程执行,而不是为每个任务都创建一个新的线程。通过这种方式,线程池可以**减少线程创建和销毁的开销**,提高系统的性能和资源利用率。 - 线程创建和销毁的开销: - 创建和销毁线程是一个相对昂贵的操作,涉及到操作系统资源的分配和回收。频繁地创建和销毁线程会导致系统性能下降。 - 例如,假设你有一个任务需要执行 1000 次,如果每次任务都创建一个新线程,那么系统将创建 1000 个线程,这会导致大量的资源消耗和上下文切换开销。 - 资源管理问题: - 如果不加以控制,系统中可能会同时存在大量的线程,导致系统资源(如内存、CPU)被过度占用,甚至可能导致系统崩溃。 - 例如,在一个 Web 服务器中,如果每个请求都创建一个新线程来处理,那么在并发请求量很大的情况下,系统可能会因为线程过多而耗尽内存。 - 任务调度的复杂性: - 手动管理多个线程的创建、销毁和任务分配会增加代码的复杂性,容易引入 bug。 - 例如,如果你需要手动管理 100 个线程的任务分配和同步,代码会变得非常复杂且难以维护。 ## 简易线程池 ```cpp #include #include #include #include #include #include #include #include class ThreadPool { public: ThreadPool(size_t numThreads); ~ThreadPool(); void enqueue(const std::function &task); private: std::vector workers; std::queue> tasks; std::mutex queueMutex; std::condition_variable condition; std::atomic stop; void worker(); }; ThreadPool::ThreadPool(size_t numThreads) : stop(false) { for (size_t i = 0; i < numThreads; ++i) { workers.emplace_back([this] { worker(); }); } } ThreadPool::~ThreadPool() { stop = true; condition.notify_all(); for (std::thread &worker : workers) { worker.join(); } } void ThreadPool::enqueue(const std::function &task) { { std::unique_lock lock(queueMutex); tasks.push(task); } condition.notify_one(); } void ThreadPool::worker() { while (true) { std::function task; { std::unique_lock lock(queueMutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); } task(); } } int main() { ThreadPool pool(4); for (int i = 0; i < 8; ++i) { pool.enqueue([i] { std::cout << "Processing task " << i << std::endl; }); } std::this_thread::sleep_for(std::chrono::seconds(2)); return 0; } ``` - `workers`:一个 `std::vector`,存储线程对象 - `tasks`:一个 `std::queue`,存储待执行的任务 - `queueMutex`:互斥锁,用于保护对 `tasks` 队列的访问,避免多线程竞争 - `condition`:条件变量,用于通知线程有任务可以执行 - `stop`:一个 `std::atomic` 标志,用于标记线程池是否已经停止。线程池在被销毁时会将 `stop` 设置为 `true` 在 `ThreadPool` 创建的时候,就会创建指定数量的线程,并进入阻塞状态,在 `enqueue` 添加任务的时候调用 `condition.notify_one()` 唤醒一个等待中的线程,通知它有新任务可以处理 `condition.notify_one()` 随机唤醒其中一个线程,`condition.notify_all()` 会唤醒所有的线程 ## BS_thread_pool 在该项目中,线程池的定义为 `thread_pool` ```cpp template class [[nodiscard]] thread_pool { // ........... }; ``` 这里 `OptFlags` 用于定义线程池的作用 ```cpp using opt_t = std::uint8_t; enum tp : opt_t { none = 0, priority = 1 << 0, pause = 1 << 2, wait_deadlock_checks = 1 << 3 }; ``` 这些分别对应四种不同功能的线程池 ```cpp using light_thread_pool = thread_pool; using priority_thread_pool = thread_pool; using pause_thread_pool = thread_pool; using wdc_thread_pool = thread_pool; ``` 在 `thread_pool` 模板类中,根据传入模板值进行参数的设置 ```cpp static constexpr bool priority_enabled = (OptFlags & tp::priority) != 0; static constexpr bool pause_enabled = (OptFlags & tp::pause) != 0; static constexpr bool wait_deadlock_checks_enabled = (OptFlags & tp::wait_deadlock_checks) != 0; ``` 在定义 `light_thread_pool`、`priority_thread_pool` 等的时候就已经将其属性设置好,在后续类的使用中直接可以通过属性判断进行功能判断 ```cpp std::conditional_t, std::queue> tasks; std::conditional_t paused = {}; ``` 比如上述代码定义 `tasks` 任务列表,根据 `priority_enabled` 是否根据优先级排序来选择定义的 `tasks` 的队列类型是 `priority_queue` 还是 `queue`,这个类型在编译期就确定了 ### submit_task 函数 关于 `std::future` `std::future` 是 C++11 标准库中引入的一种用于表示异步操作结果的机制。用于在请求处理结果的线程和执行任务的线程之间进行同步。`std::future` 提供了一个简洁的方式来获取异步计算的结果 使用 `std::future::get()` 可以用于获取结果,如果结果尚未准备好,则将线程阻塞,直到结果可用,该函数**只能被调用一次**,后续调用将会导致一场 使用 `std::future::valid()` 用于检查 `std::future` 对象是否有与之关联的共享状态 使用 `std::future_status` 用于查询任务的状态,比如是否超时等 ```cpp #include #include #include int compute() { std::this_thread::sleep_for(std::chrono::seconds(3)); return 42; } int main() { // 使用 std::async 启动一个异步任务,返回 std::future std::future result = std::async(std::launch::async, compute); // 模拟其他工作 std::cout << "Doing other work...\n"; std::this_thread::sleep_for(std::chrono::seconds(1)); // 获取异步任务的结果 std::cout << "Waiting for result...\n"; int value = result.get(); // 阻塞直到 compute 完成并返回结果 std::cout << "Result: " << value << "\n"; return 0; } ``` 上述代码使用 `std::async` 创建一个异步任务,用 `std::future` 表示结果是一个 int 类型的值 关于 `std::promise` `std::promise` 是 C++11 标准引入的另一种同步机制,用于在线程之间传递数据。它与 `std::future` 密切相关,提供了一种方式来设置数据的结果,`std::future` 则用于获取这一结果 `std::promise` 用于在一个线程中设置某个异步操作的结果。你可以把它看作是 **承诺** 去提供一个结果,它保证返回值是一个它定义的类型 每个 `std::promise` 对象可以通过 `get_future()` 方法生成一个对应的 `std::future` 对象。这个 `std::future` 可以在另一个线程中使用,以获取 `std::promise` 所设置的结果 ```cpp #include #include #include // 线程函数,执行一些计算并将结果设置到 promise 中 void calculate(std::promise& p, int value) { try { if (value < 0) { throw std::invalid_argument("Negative value not allowed"); } // 这里模拟一些计算 int result = value * 2; std::this_thread::sleep_for(std::chrono::seconds(2)); // 设置计算结果 p.set_value(result); } catch (...) { // 如果发生异常,设置异常状态 p.set_exception(std::current_exception()); } } int main() { // 创建一个 promise 对象 std::promise p; // 获取与 promise 相关联的 future 对象 std::future f = p.get_future(); // 启动线程,并将 promise 传递给线程函数 std::thread t(calculate, std::ref(p), 10); try { // 从 future 获取计算结果,阻塞直到结果可用 int result = f.get(); std::cout << "Result from the thread: " << result << std::endl; } catch (const std::exception& e) { // 处理线程中抛出的异常 std::cout << "Exception: " << e.what() << std::endl; } // 等待线程完成 t.join(); return 0; } ``` 通过上面代码介绍了 `std::future` 和 `std::promise` 的作用,接下来就是 `submit_task` 函数的源代码了 ```cpp template >> [[nodiscard]] std::future submit_task(F&& task, const priority_t priority = 0) { #ifdef __cpp_lib_move_only_function std::promise promise; #define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise. #else const std::shared_ptr> promise = std::make_shared>(); #define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise-> #endif std::future future = BS_THREAD_POOL_PROMISE_MEMBER_ACCESS get_future(); detach_task( [task = std::forward(task), promise = std::move(promise)]() mutable { #ifdef __cpp_exceptions try { #endif if constexpr (std::is_void_v) { task(); BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value(); } else { BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value(task()); } #ifdef __cpp_exceptions } catch (...) { try { BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_exception(std::current_exception()); } catch (...) { } } #endif }, priority); return future; } ``` 这个函数是一个模板函数, 用于将任务提交到线程池的任务队列中, 并返回一个 `std::future` 对象以获取任务的结果。它异步地执行一个函数,并在稍后获取结果或等待任务完成 该方法封装了一个 lambda 表达式,将传入的 `task` 封装了一层,创建了自己的 `task`,再将自己的 `task` 和 `priority` 作为参数传递给 `detach_task` ```cpp template void detach_task(F&& task, const priority_t priority = 0) { { const std::scoped_lock tasks_lock(tasks_mutex); if constexpr (priority_enabled) tasks.emplace(std::forward(task), priority); else tasks.emplace(std::forward(task)); } task_available_cv.notify_one(); } ```