新增submit_with_main_thread_callback

This commit is contained in:
Nanako 2025-01-06 21:28:23 +08:00
parent 9941d24190
commit 8e635ecc87

View File

@ -30,8 +30,8 @@ public:
/**
* @brief 线
* @tparam F
* @tparam Args
* @tparam f
* @tparam args
* @param in_func
* @param in_args
* @return std::future
@ -39,26 +39,101 @@ public:
* auto res = thread_pool.submit([](int a, int b) { return a + b; }, 1, 2);
* @endcode
*/
template<typename F, typename... Args>
auto submit(F&& in_func, Args&&... in_args) {
using return_type = std::invoke_result_t<F, Args...>;
template<typename f, typename ...args>
auto submit(f&& in_func, args&&... in_args) {
using return_type = std::invoke_result_t<f, args...>;
if (stop) { throw std::runtime_error("submit on stopped ThreadPool"); }
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(in_func), std::forward<Args>(in_args)...)
std::bind(std::forward<f>(in_func), std::forward<args>(in_args)...)
);
auto res = task->get_future();
{
std::lock_guard lock(queue_mutex);
tasks.push([task] { (*task)(); });
tasks.emplace([task] { (*task)(); });
}
condition.notify_one();
return res;
}
/**
* @brief 线线
* @tparam f
* @tparam callback
* @tparam args
* @param in_func
* @param in_callback
* @param in_args
* @return std::future
* @code
* auto func = [](int a, int b) { return a + b; };
* auto callback = [](std::optional<int> result) { std::cout << result.value_or(-1) << std::endl; };
* auto res = thread_pool.submit_with_main_thread_callback(func, callback, 1, 2);
* @endcode
*/
template<typename f, typename callback, typename ...args>
auto submit_with_main_thread_callback(f&& in_func, callback&& in_callback, args&&... in_args) {
using return_type = std::invoke_result_t<f, args...>;
if (stop) { throw std::runtime_error("submit on stopped ThreadPool"); }
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<f>(in_func), std::forward<args>(in_args)...)
);
auto res = task->get_future();
{
auto task_with_callback = [this, task, callback = std::forward<callback>(in_callback), shared_res = res.share()] {
std::optional<return_type> callback_value;
try {
(*task)();
callback_value = shared_res.get();
} catch (...) {
// 如果出现异常,将异常信息传递给回调函数
callback_value = std::nullopt;
}
// 创建一个包装后的回调任务,并将其添加到主线程回调队列
std::lock_guard callback_lock(main_queue_mutex);
main_thread_callbacks.emplace([callback, callback_value] {
callback(callback_value);
});
};
std::lock_guard lock(queue_mutex);
tasks.emplace(task_with_callback);
}
condition.notify_one();
return res;
}
/**
* @brief 线线
*/
void process_main_thread_callbacks() {
{
std::lock_guard lock(main_queue_mutex);
if (main_thread_callbacks.empty()) {
return;
}
}
std::queue<std::function<void()>> callbacks;
{
std::lock_guard lock(main_queue_mutex);
std::swap(callbacks, main_thread_callbacks);
}
while (!callbacks.empty()) {
auto &callback = callbacks.front();
callback();
callbacks.pop();
}
}
private:
void worker_thread();
@ -69,4 +144,8 @@ private:
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic_bool stop;
// 主线程回调队列
std::queue<std::function<void()>> main_thread_callbacks;
std::mutex main_queue_mutex;
};