|
|
11 mēneši atpakaļ | |
|---|---|---|
| .. | ||
| src | 11 mēneši atpakaļ | |
| README.md | 11 mēneši atpakaļ | |
源码路径: https://github.com/bshoshany/thread-pool/blob/master/tests/BS_thread_pool_test.cpp
线程池(Thread Pool) 是一种并发编程的设计模式,用于管理和复用多个线程,以便高效地执行大量任务。线程池的核心思想是预先创建一组线程,并将任务分配给这些线程执行,而不是为每个任务都创建一个新的线程。通过这种方式,线程池可以减少线程创建和销毁的开销,提高系统的性能和资源利用率。
线程创建和销毁的开销:
资源管理问题:
任务调度的复杂性:
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
class ThreadPool {
public:
ThreadPool(size_t numThreads);
~ThreadPool();
void enqueue(const std::function<void()> &task);
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
std::atomic<bool> stop;
void worker();
};
ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] { worker(); });
}
}
ThreadPool::~ThreadPool() {
stop = true;
condition.notify_all();
for (std::thread &worker : workers) {
worker.join();
}
}
void ThreadPool::enqueue(const std::function<void()> &task) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(task);
}
condition.notify_one();
}
void ThreadPool::worker() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
}
int main() {
ThreadPool pool(4);
for (int i = 0; i < 8; ++i) {
pool.enqueue([i] {
std::cout << "Processing task " << i << std::endl;
});
}
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
workers:一个 std::vector,存储线程对象tasks:一个 std::queue,存储待执行的任务queueMutex:互斥锁,用于保护对 tasks 队列的访问,避免多线程竞争condition:条件变量,用于通知线程有任务可以执行stop:一个 std::atomic<bool> 标志,用于标记线程池是否已经停止。线程池在被销毁时会将 stop 设置为 true在 ThreadPool 创建的时候,就会创建指定数量的线程,并进入阻塞状态,在 enqueue 添加任务的时候调用 condition.notify_one() 唤醒一个等待中的线程,通知它有新任务可以处理
condition.notify_one() 随机唤醒其中一个线程,condition.notify_all() 会唤醒所有的线程
关于 std::future
std::future 是 C++11 标准库中引入的一种用于表示异步操作结果的机制。用于在请求处理结果的线程和执行任务的线程之间进行同步。std::future 提供了一个简洁的方式来获取异步计算的结果
使用 std::future::get() 可以用于获取结果,如果结果尚未准备好,则将线程阻塞,直到结果可用,该函数只能被调用一次,后续调用将会导致一场
使用 std::future::valid() 用于检查 std::future 对象是否有与之关联的共享状态
使用 std::future_status 用于查询任务的状态,比如是否超时等
#include <iostream>
#include <future>
#include <chrono>
int compute() {
std::this_thread::sleep_for(std::chrono::seconds(3));
return 42;
}
int main() {
// 使用 std::async 启动一个异步任务,返回 std::future
std::future<int> result = std::async(std::launch::async, compute);
// 模拟其他工作
std::cout << "Doing other work...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
// 获取异步任务的结果
std::cout << "Waiting for result...\n";
int value = result.get(); // 阻塞直到 compute 完成并返回结果
std::cout << "Result: " << value << "\n";
return 0;
}
上述代码使用 std::async 创建一个异步任务,用 std::future<int> 表示结果是一个 int 类型的值
关于 std::promise
std::promise 是 C++11 标准引入的另一种同步机制,用于在线程之间传递数据。它与 std::future 密切相关,提供了一种方式来设置数据的结果,std::future 则用于获取这一结果
std::promise 用于在一个线程中设置某个异步操作的结果。你可以把它看作是 承诺 去提供一个结果,它保证返回值是一个它定义的类型
每个 std::promise 对象可以通过 get_future() 方法生成一个对应的 std::future 对象。这个 std::future 可以在另一个线程中使用,以获取 std::promise 所设置的结果
#include <iostream>
#include <thread>
#include <future>
// 线程函数,执行一些计算并将结果设置到 promise 中
void calculate(std::promise<int>& p, int value) {
try {
if (value < 0) {
throw std::invalid_argument("Negative value not allowed");
}
// 这里模拟一些计算
int result = value * 2;
std::this_thread::sleep_for(std::chrono::seconds(2));
// 设置计算结果
p.set_value(result);
} catch (...) {
// 如果发生异常,设置异常状态
p.set_exception(std::current_exception());
}
}
int main() {
// 创建一个 promise 对象
std::promise<int> p;
// 获取与 promise 相关联的 future 对象
std::future<int> f = p.get_future();
// 启动线程,并将 promise 传递给线程函数
std::thread t(calculate, std::ref(p), 10);
try {
// 从 future 获取计算结果,阻塞直到结果可用
int result = f.get();
std::cout << "Result from the thread: " << result << std::endl;
} catch (const std::exception& e) {
// 处理线程中抛出的异常
std::cout << "Exception: " << e.what() << std::endl;
}
// 等待线程完成
t.join();
return 0;
}
std::common_type_t 是 C++ 标准库中提供的一个类型特性,用于确定一组类型的通用类型。它会尝试通过类型转换规则找到一个能够容纳所有给定类型值的类型
std::common_type_t 就是这个类型本身int,另一个是 double,std::common_type_t 会选择 double 作为通用类型,因为 double 可以表示所有 int 值而不会丢失精度std::is_signed_v 和 std::is_unsigned_v 用于判断类型是否 有符合 和 无符号 的,如果是 无符号类型,使用 std::is_unsigned_v 返回 true,使用 std::is_signed_v 返回 false,反之亦然
std::enable_if_t 是一种用于 SFINAE(Substitution Failure Is Not An Error)的工具。它允许在模板参数中进行条件编译,如果条件为 true,则 std::enable_if_t 定义一个类型;如果条件为 false,则不定义类型,使模板特化失效
下面是 BS_thread_pool 对上述模板的统一使用
// 通用模板
template <typename T1, typename T2, typename Enable = void>
struct common_index_type
{
using type = std::common_type_t<T1, T2>;
};
// 两个有符号整数
template <typename T1, typename T2>
struct common_index_type<T1, T2, std::enable_if_t<std::is_signed_v<T1> && std::is_signed_v<T2>>>
{
using type = std::conditional_t<(sizeof(T1) >= sizeof(T2)), T1, T2>;
};
// 两个无符号整数
template <typename T1, typename T2>
struct common_index_type<T1, T2, std::enable_if_t<std::is_unsigned_v<T1> && std::is_unsigned_v<T2>>>
{
using type = std::conditional_t<(sizeof(T1) >= sizeof(T2)), T1, T2>;
};
// 一个有符号 一个无符号整数
template <typename T1, typename T2>
struct common_index_type<T1, T2, std::enable_if_t<(std::is_signed_v<T1> && std::is_unsigned_v<T2>) || (std::is_unsigned_v<T1> && std::is_signed_v<T2>)>>
{
using S = std::conditional_t<std::is_signed_v<T1>, T1, T2>;
using U = std::conditional_t<std::is_unsigned_v<T1>, T1, T2>;
static constexpr std::size_t larger_size = (sizeof(S) > sizeof(U)) ? sizeof(S) : sizeof(U);
using type = std::conditional_t<larger_size <= 4,
std::conditional_t<larger_size == 1 || (sizeof(S) == 2 && sizeof(U) == 1), std::int16_t, std::conditional_t<larger_size == 2 || (sizeof(S) == 4 && sizeof(U) < 4), std::int32_t, std::int64_t>>,
std::conditional_t<sizeof(U) == 8, std::uint64_t, std::int64_t>>;
};
使用模板特化的功能,分别定义四种不同的情况下使用的 common_index_type
sizeof(T) 更大,可以保证不会被截断,进而保证正确性int16_t、int32_t或 int64_t;对于更大的整数,选择 uint64_t 或 int64_t#define BS_THREAD_POOL_IF_PAUSE_ENABLED template <bool P = pause_enabled, typename = std::enable_if_t<P>>
#define BS_THREAD_POOL_INIT_FUNC_CONCEPT(F) typename F, typename = std::enable_if_t<std::is_invocable_v<F> || std::is_invocable_v<F, std::size_t>>
class thread_pool
{
// ...
BS_THREAD_POOL_IF_PAUSE_ENABLED
[[nodiscard]] bool is_paused() const
{
const std::scoped_lock tasks_lock(tasks_mutex);
return paused;
}
// ...
};
以上面这段代码为例,结合 BS_THREAD_POOL_IF_PAUSE_ENABLED 宏的使用
如果线程池没有启用了暂停功能,则 pause_enable = false,那么 P = false,那么 std::enable_if_t<false> 会通过 SFINAE 机制,自动忽略使用了该宏的特定成员函数或模板,也就是不会生成 pause() 和 unpause() 代码,减少了无意义的接口支持
std::enable_if_t<P> 是一个条件模板工具
true,std::enable_if_t<P> 将生成一个有效的类型(通常为 void)false,std::enable_if_t<P> 会导致模板实例化失败SFINAE 全称是 Substitution Failure Is Not An Error,替换失败不是错误
那么 std::enable_if_t 模板实例化失败,并不会报错,只是不生成,会忽略这个模板实例化
std::is_invocable_v 是 C++17 标准中引入的一个类型特性工具,用于检查一个类型是否可以作为函数调用。这在模板编程中非常有用,特别是在需要检查某个对象是否可以用特定的参数进行调用的情况下
std::is_invocable_v<F>:检查 F 是否是无参数的可调用对象(例如一个 lambda 或普通函数 void init())std::is_invocable_v<F, std::size_t>:检查 F 是否是可以接受一个 std::size_t 类型参数的可调用对象(例如参数为线程索引的初始化函数 void init(std::size_t))#include <type_traits>
#include <iostream>
#include <functional>
int add(int a, int b) {
return a + b;
}
int main() {
auto lambda = [](double x) -> double { return x * x; };
// 检查普通函数是否可调用
bool isAddInvocable = std::is_invocable_v<decltype(add), int, int>;
// 检查 lambda 是否可调用
bool isLambdaInvocable = std::is_invocable_v<decltype(lambda), double>;
// 检查带 bind 的函数是否可调用
auto boundFunc = std::bind(add, std::placeholders::_1, 5);
bool isBoundFuncInvocable = std::is_invocable_v<decltype(boundFunc), int>;
std::cout << "Add is invocable: " << isAddInvocable << std::endl; // 输出:1 (true)
std::cout << "Lambda is invocable: " << isLambdaInvocable << std::endl; // 输出:1 (true)
std::cout << "Bound function is invocable: " << isBoundFuncInvocable << std::endl; // 输出:1 (true)
return 0;
}
在该项目中,线程池的定义为 thread_pool
template <opt_t OptFlags = tp::none>
class [[nodiscard]] thread_pool
{
// ...........
};
这里 OptFlags 用于定义线程池的作用
using opt_t = std::uint8_t;
enum tp : opt_t
{
none = 0,
priority = 1 << 0,
pause = 1 << 2,
wait_deadlock_checks = 1 << 3
};
这些分别对应四种不同功能的线程池
using light_thread_pool = thread_pool<tp::none>;
using priority_thread_pool = thread_pool<tp::priority>;
using pause_thread_pool = thread_pool<tp::pause>;
using wdc_thread_pool = thread_pool<tp::wait_deadlock_checks>;
在 thread_pool 模板类中,根据传入模板值进行参数的设置
static constexpr bool priority_enabled = (OptFlags & tp::priority) != 0;
static constexpr bool pause_enabled = (OptFlags & tp::pause) != 0;
static constexpr bool wait_deadlock_checks_enabled = (OptFlags & tp::wait_deadlock_checks) != 0;
在定义 light_thread_pool、priority_thread_pool 等的时候就已经将其属性设置好,在后续类的使用中直接可以通过属性判断进行功能判断
std::conditional_t<priority_enabled, std::priority_queue<pr_task>, std::queue<task_t>> tasks;
std::conditional_t<pause_enabled, bool, std::monostate> paused = {};
比如上述代码定义 tasks 任务列表,根据 priority_enabled 是否根据优先级排序来选择定义的 tasks 的队列类型是 priority_queue 还是 queue,这个类型在编译期就确定了
上面代码介绍过 std::future 和 std::promise 的作用,接下来就是 submit_task 函数的源代码了
template <typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
[[nodiscard]] std::future<R> submit_task(F&& task, const priority_t priority = 0)
{
#ifdef __cpp_lib_move_only_function
std::promise<R> promise;
#define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise.
#else
const std::shared_ptr<std::promise<R>> promise = std::make_shared<std::promise<R>>();
#define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise->
#endif
std::future<R> future = BS_THREAD_POOL_PROMISE_MEMBER_ACCESS get_future();
detach_task(
[task = std::forward<F>(task), promise = std::move(promise)]() mutable
{
#ifdef __cpp_exceptions
try
{
#endif
if constexpr (std::is_void_v<R>)
{
task();
BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value();
}
else
{
BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value(task());
}
#ifdef __cpp_exceptions
}
catch (...)
{
try
{
BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_exception(std::current_exception());
}
catch (...)
{
}
}
#endif
},
priority);
return future;
}
这个函数是一个模板函数, 用于将任务提交到线程池的任务队列中, 并返回一个 std::future 对象以获取任务的结果。它异步地执行一个函数,并在稍后获取结果或等待任务完成
该方法封装了一个 lambda 表达式,将传入的 task 封装了一层,创建了自己的 task,再将自己的 task 和 priority 作为参数传递给 detach_task
template <typename F>
void detach_task(F&& task, const priority_t priority = 0)
{
{
const std::scoped_lock tasks_lock(tasks_mutex);
if constexpr (priority_enabled)
tasks.emplace(std::forward<F>(task), priority);
else
tasks.emplace(std::forward<F>(task));
}
task_available_cv.notify_one();
}