Prechádzať zdrojové kódy

根据 submit_task 介绍 future 和 promise

nicetry12138 11 mesiacov pred
rodič
commit
e5f1073dd3
1 zmenil súbory, kde vykonal 179 pridanie a 2 odobranie
  1. 179 2
      源码解读/BS_thread_pool/README.md

+ 179 - 2
源码解读/BS_thread_pool/README.md

@@ -134,7 +134,7 @@ enum tp : opt_t
 };
 ```
 
-这些分别对应种不同功能的线程池
+这些分别对应种不同功能的线程池
 
 ```cpp
 using light_thread_pool = thread_pool<tp::none>;
@@ -143,7 +143,7 @@ using pause_thread_pool = thread_pool<tp::pause>;
 using wdc_thread_pool = thread_pool<tp::wait_deadlock_checks>;
 ```
 
-在 `thread_pool` 类中,根据模板进行值的设置
+在 `thread_pool` 模板类中,根据传入模板值进行参数的设置
 
 ```cpp
 static constexpr bool priority_enabled = (OptFlags & tp::priority) != 0;
@@ -151,3 +151,180 @@ static constexpr bool pause_enabled = (OptFlags & tp::pause) != 0;
 static constexpr bool wait_deadlock_checks_enabled = (OptFlags & tp::wait_deadlock_checks) != 0;
 ```
 
+在定义 `light_thread_pool`、`priority_thread_pool` 等的时候就已经将其属性设置好,在后续类的使用中直接可以通过属性判断进行功能判断
+
+```cpp
+std::conditional_t<priority_enabled, std::priority_queue<pr_task>, std::queue<task_t>> tasks;
+std::conditional_t<pause_enabled, bool, std::monostate> paused = {};
+```
+
+比如上述代码定义 `tasks` 任务列表,根据 `priority_enabled` 是否根据优先级排序来选择定义的 `tasks` 的队列类型是 `priority_queue` 还是 `queue`,这个类型在编译期就确定了
+
+### submit_task 函数
+
+关于 `std::future`
+
+`std::future` 是 C++11 标准库中引入的一种用于表示异步操作结果的机制。用于在请求处理结果的线程和执行任务的线程之间进行同步。`std::future` 提供了一个简洁的方式来获取异步计算的结果
+
+使用 `std::future::get()` 可以用于获取结果,如果结果尚未准备好,则将线程阻塞,直到结果可用,该函数**只能被调用一次**,后续调用将会导致一场
+
+使用 `std::future::valid()` 用于检查 `std::future` 对象是否有与之关联的共享状态
+
+使用 `std::future_status` 用于查询任务的状态,比如是否超时等
+
+```cpp
+#include <iostream>
+#include <future>
+#include <chrono>
+
+int compute() {
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+    return 42;
+}
+
+int main() {
+    // 使用 std::async 启动一个异步任务,返回 std::future
+    std::future<int> result = std::async(std::launch::async, compute);
+
+    // 模拟其他工作
+    std::cout << "Doing other work...\n";
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    // 获取异步任务的结果
+    std::cout << "Waiting for result...\n";
+    int value = result.get(); // 阻塞直到 compute 完成并返回结果
+    std::cout << "Result: " << value << "\n";
+    
+    return 0;
+}
+```
+
+上述代码使用 `std::async` 创建一个异步任务,用 `std::future<int>` 表示结果是一个 int 类型的值
+
+关于 `std::promise`
+
+`std::promise` 是 C++11 标准引入的另一种同步机制,用于在线程之间传递数据。它与 `std::future` 密切相关,提供了一种方式来设置数据的结果,`std::future` 则用于获取这一结果
+
+`std::promise` 用于在一个线程中设置某个异步操作的结果。你可以把它看作是 **承诺** 去提供一个结果,它保证返回值是一个它定义的类型
+
+每个 `std::promise` 对象可以通过 `get_future()` 方法生成一个对应的 `std::future` 对象。这个 `std::future` 可以在另一个线程中使用,以获取 `std::promise` 所设置的结果
+
+```cpp
+#include <iostream>
+#include <thread>
+#include <future>
+
+// 线程函数,执行一些计算并将结果设置到 promise 中
+void calculate(std::promise<int>& p, int value) {
+    try {
+        if (value < 0) {
+            throw std::invalid_argument("Negative value not allowed");
+        }
+        
+        // 这里模拟一些计算
+        int result = value * 2;
+        std::this_thread::sleep_for(std::chrono::seconds(2));
+        
+        // 设置计算结果
+        p.set_value(result);
+    } catch (...) {
+        // 如果发生异常,设置异常状态
+        p.set_exception(std::current_exception());
+    }
+}
+
+int main() {
+    // 创建一个 promise 对象
+    std::promise<int> p;
+    
+    // 获取与 promise 相关联的 future 对象
+    std::future<int> f = p.get_future();
+    
+    // 启动线程,并将 promise 传递给线程函数
+    std::thread t(calculate, std::ref(p), 10);
+    
+    try {
+        // 从 future 获取计算结果,阻塞直到结果可用
+        int result = f.get();
+        std::cout << "Result from the thread: " << result << std::endl;
+    } catch (const std::exception& e) {
+        // 处理线程中抛出的异常
+        std::cout << "Exception: " << e.what() << std::endl;
+    }
+    
+    // 等待线程完成
+    t.join();
+    
+    return 0;
+}
+```
+
+通过上面代码介绍了 `std::future` 和 `std::promise` 的作用,接下来就是 `submit_task` 函数的源代码了
+
+
+```cpp
+template <typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
+[[nodiscard]] std::future<R> submit_task(F&& task, const priority_t priority = 0)
+{
+#ifdef __cpp_lib_move_only_function
+    std::promise<R> promise;
+#define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise.
+#else
+    const std::shared_ptr<std::promise<R>> promise = std::make_shared<std::promise<R>>();
+#define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise->
+#endif
+    std::future<R> future = BS_THREAD_POOL_PROMISE_MEMBER_ACCESS get_future();
+    detach_task(
+        [task = std::forward<F>(task), promise = std::move(promise)]() mutable
+        {
+#ifdef __cpp_exceptions
+            try
+            {
+#endif
+                if constexpr (std::is_void_v<R>)
+                {
+                    task();
+                    BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value();
+                }
+                else
+                {
+                    BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value(task());
+                }
+#ifdef __cpp_exceptions
+            }
+            catch (...)
+            {
+                try
+                {
+                    BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_exception(std::current_exception());
+                }
+                catch (...)
+                {
+                }
+            }
+#endif
+        },
+        priority);
+    return future;
+}
+```
+
+这个函数是一个模板函数, 用于将任务提交到线程池的任务队列中, 并返回一个 `std::future` 对象以获取任务的结果。它异步地执行一个函数,并在稍后获取结果或等待任务完成
+
+该方法封装了一个 lambda 表达式,将传入的 `task` 封装了一层,创建了自己的 `task`,再将自己的 `task` 和 `priority` 作为参数传递给 `detach_task`
+
+```cpp
+template <typename F>
+void detach_task(F&& task, const priority_t priority = 0)
+{
+    {
+        const std::scoped_lock tasks_lock(tasks_mutex);
+        if constexpr (priority_enabled)
+            tasks.emplace(std::forward<F>(task), priority);
+        else
+            tasks.emplace(std::forward<F>(task));
+    }
+    task_available_cv.notify_one();
+}
+```
+