From 9941d24190ec269d422742bbf0a58e5c1abf06f7 Mon Sep 17 00:00:00 2001 From: Nanako <469449812@qq.com> Date: Mon, 6 Jan 2025 19:26:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BA=BF=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E6=9E=90=E6=9E=84=E6=97=B6=E6=89=80=E6=9C=89=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B2=A1=E6=9C=89=E9=80=80=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/async/thread_event.h | 33 ++++++++++++++++++++++--------- src/core/async/thread_pool.cpp | 36 +++++++++++++++++++++++----------- src/core/async/thread_pool.h | 21 ++++++++++++-------- 3 files changed, 62 insertions(+), 28 deletions(-) diff --git a/src/core/async/thread_event.h b/src/core/async/thread_event.h index 5ffb288..8c84cba 100644 --- a/src/core/async/thread_event.h +++ b/src/core/async/thread_event.h @@ -5,31 +5,46 @@ class thread_event { public: + thread_event() = default; + explicit thread_event(bool manual_reset) : manual_reset(manual_reset) {} + void wait() { std::unique_lock lock(mutex); cv.wait(lock, [this] { return signaled; }); - signaled = false; + if (!manual_reset) { + signaled = false; + } } - void wait(std::chrono::milliseconds timeout) { + bool wait(std::chrono::milliseconds timeout) { std::unique_lock lock(mutex); - cv.wait_for(lock, timeout, [this] { return signaled; }); - signaled = false; + const bool was_signaled = cv.wait_for(lock, timeout, [this] { return signaled; }); + if (was_signaled && !manual_reset) { + signaled = false; + } + return was_signaled; } void signal() { std::lock_guard lock(mutex); signaled = true; - cv.notify_one(); + if (manual_reset) { + cv.notify_all(); + } else { + cv.notify_one(); + } } - void broadcast_signal() { - std::lock_guard lock(mutex); - signaled = true; - cv.notify_all(); + // 重置事件为无信号状态(仅手动复位事件使用) + void reset() { + if (manual_reset) { + std::lock_guard lock(mutex); + signaled = false; + } } private: std::condition_variable cv; std::mutex mutex; bool signaled = false; + bool manual_reset = false; }; diff --git a/src/core/async/thread_pool.cpp b/src/core/async/thread_pool.cpp index 0147eed..57764df 100644 --- a/src/core/async/thread_pool.cpp +++ b/src/core/async/thread_pool.cpp @@ -1,5 +1,7 @@ #include "thread_pool.h" +#include + thread_pool::thread_pool(const size_t num_threads) : stop(false) { // 创建线程 for (std::size_t i = 0; i < num_threads; ++i) { @@ -10,23 +12,35 @@ thread_pool::thread_pool(const size_t num_threads) : stop(false) { thread_pool::~thread_pool() { stop = true; // 唤醒所有线程 - task_available.broadcast_signal(); + condition.notify_all(); + + for (auto& thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } } void thread_pool::worker_thread() { while (true) { - // 如果线程池停止且任务队列为空,则退出 - if (stop && tasks.empty()) { - return; + std::function task; + { + std::unique_lock lock(queue_mutex); + condition.wait(lock, [this] { return stop || !this->tasks.empty(); }); + if (stop && tasks.empty()) { + return; + } + task = std::move(tasks.front()); + tasks.pop(); } - // 从任务队列中取出任务并执行 - if (const auto& task = tasks.pop()) { - task.value()(); - continue; + try { + task(); + } catch (const std::exception& e) { + // 处理任务中的异常,避免线程池崩溃 + std::cerr << "Task exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Task exception: unknown exception" << std::endl; } - - // 如果任务队列为空,则等待任务 - task_available.wait(); } } diff --git a/src/core/async/thread_pool.h b/src/core/async/thread_pool.h index 6a79eb0..2cfb9b4 100644 --- a/src/core/async/thread_pool.h +++ b/src/core/async/thread_pool.h @@ -43,25 +43,30 @@ public: auto submit(F&& in_func, Args&&... in_args) { using return_type = std::invoke_result_t; - auto bind_task = std::make_shared>( + if (stop) { throw std::runtime_error("submit on stopped ThreadPool"); } + + auto task = std::make_shared>( std::bind(std::forward(in_func), std::forward(in_args)...) ); - auto res = bind_task->get_future(); - if (stop) { throw std::runtime_error("submit on stopped ThreadPool"); } + auto res = task->get_future(); - tasks.push([bind_task] { (*bind_task)(); }); - task_available.signal(); + { + std::lock_guard lock(queue_mutex); + tasks.push([task] { (*task)(); }); + } + condition.notify_one(); return res; } private: void worker_thread(); - std::vector threads; - safe_queue> tasks; + std::vector threads; + std::queue> tasks; // 同步原语 - thread_event task_available; + std::mutex queue_mutex; + std::condition_variable condition; std::atomic_bool stop; };