From c59daeaa92774c04dda97b67c8ea7d87a2fead55 Mon Sep 17 00:00:00 2001 From: Nanako <469449812@qq.com> Date: Sun, 5 Jan 2025 22:18:06 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E5=92=8C=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/async/async_task.cpp | 1 + src/core/async/async_task.h | 5 +++ src/core/async/queued_thread_pool.cpp | 32 +++++++++++++++++++ src/core/async/queued_thread_pool.h | 44 +++++++++++++++++++++++++++ src/core/async/thread_event.h | 35 +++++++++++++++++++++ 5 files changed, 117 insertions(+) create mode 100644 src/core/async/async_task.cpp create mode 100644 src/core/async/async_task.h create mode 100644 src/core/async/queued_thread_pool.cpp create mode 100644 src/core/async/queued_thread_pool.h create mode 100644 src/core/async/thread_event.h diff --git a/src/core/async/async_task.cpp b/src/core/async/async_task.cpp new file mode 100644 index 0000000..6c889c4 --- /dev/null +++ b/src/core/async/async_task.cpp @@ -0,0 +1 @@ +#include "async_task.h" diff --git a/src/core/async/async_task.h b/src/core/async/async_task.h new file mode 100644 index 0000000..b43af9f --- /dev/null +++ b/src/core/async/async_task.h @@ -0,0 +1,5 @@ +#pragma once + +class async_task { + +}; diff --git a/src/core/async/queued_thread_pool.cpp b/src/core/async/queued_thread_pool.cpp new file mode 100644 index 0000000..999dd18 --- /dev/null +++ b/src/core/async/queued_thread_pool.cpp @@ -0,0 +1,32 @@ +#include "queued_thread_pool.h" + +queued_thread_pool::queued_thread_pool(const size_t num_threads) : stop(false) { + // 创建线程 + for (std::size_t i = 0; i < num_threads; ++i) { + threads.emplace_back(&queued_thread_pool::worker_thread, this); + } +} + +queued_thread_pool::~queued_thread_pool() { + stop = true; + // 唤醒所有线程 + task_available.broadcast_signal(); +} + +void queued_thread_pool::worker_thread() { + while (true) { + // 如果线程池停止且任务队列为空,则退出 + if (stop && tasks.empty()) { + return; + } + + // 从任务队列中取出任务并执行 + if (const auto& task = tasks.pop()) { + task.value()(); + continue; + } + + // 如果任务队列为空,则等待任务 + task_available.wait(); + } +} diff --git a/src/core/async/queued_thread_pool.h b/src/core/async/queued_thread_pool.h new file mode 100644 index 0000000..0d2395a --- /dev/null +++ b/src/core/async/queued_thread_pool.h @@ -0,0 +1,44 @@ +#pragma once +#include <thread> +#include <queue> +#include <mutex> +#include <functional> +#include <future> +#include <memory> + +#include "thread_event.h" +#include "containers/safe_queue.h" +#include "containers/safe_vector.h" + +class queued_thread_pool { + +public: + explicit queued_thread_pool(size_t num_threads = std::thread::hardware_concurrency()); + ~queued_thread_pool(); + + template<typename F, typename... Args> + auto submit(F&& in_func, Args&&... in_args) { + using return_type = std::invoke_result_t<F, Args...>; + + auto bind_task = std::make_shared<std::packaged_task<return_type()>>( + std::bind(std::forward<F>(in_func), std::forward<Args>(in_args)...) + ); + + auto res = bind_task->get_future(); + if (stop) { throw std::runtime_error("submit on stopped ThreadPool"); } + + tasks.push([bind_task] { (*bind_task)(); }); + task_available.signal(); + + return res; + } +private: + void worker_thread(); + + std::vector<std::jthread> threads; + safe_queue<std::function<void()>> tasks; + + // 同步原语 + thread_event task_available; + std::atomic_bool stop; +}; diff --git a/src/core/async/thread_event.h b/src/core/async/thread_event.h new file mode 100644 index 0000000..5ffb288 --- /dev/null +++ b/src/core/async/thread_event.h @@ -0,0 +1,35 @@ +#pragma once +#include <condition_variable> +#include <mutex> +#include <thread> + +class thread_event { +public: + void wait() { + std::unique_lock lock(mutex); + cv.wait(lock, [this] { return signaled; }); + signaled = false; + } + + void wait(std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex); + cv.wait_for(lock, timeout, [this] { return signaled; }); + signaled = false; + } + + void signal() { + std::lock_guard lock(mutex); + signaled = true; + cv.notify_one(); + } + + void broadcast_signal() { + std::lock_guard lock(mutex); + signaled = true; + cv.notify_all(); + } +private: + std::condition_variable cv; + std::mutex mutex; + bool signaled = false; +};