将taskflow的执行器改为全局唯一,修复退出时的线程安全崩溃

This commit is contained in:
Nanako 2024-07-04 10:16:00 +08:00
parent 52b193b576
commit ae2a50a47f
6 changed files with 26 additions and 25 deletions

View File

@ -1,6 +1,7 @@
#include "application.h"
#include "audio/device/audio_device_manager.h"
#include "misc/taskflow_singleton.h"
#include "misc/singleton/singleton_manager.h"
#include "spdlog/spdlog.h"
#include "spdlog/sinks/daily_file_sink.h"
@ -26,6 +27,7 @@ void application::init() {
// 设置默认日志记录器
spdlog::set_default_logger(logger);
init_taskflow();
singleton_manager::get()->init();
@ -34,6 +36,7 @@ void application::init() {
void application::shutdown() {
singleton_manager::get()->release();
release_taskflow();
spdlog::drop_all();
spdlog::shutdown();
}

View File

@ -7,6 +7,7 @@
#include "audio/plugin_host/plugin_host.h"
#include "audio/plugin_host/plugin_host_manager.h"
#include "misc/query_timer.h"
#include "misc/taskflow_singleton.h"
#include "thread_message/thread_message_hubs.h"
IMPL_SINGLETON_INSTANCE(mixer)
@ -64,7 +65,6 @@ void mixer::init(singleton_initliazer& initliazer) {
initliazer.require<audio_device_manager>(); // 依赖音频设备管理器, 用于获取采样率和缓冲区大小
null_channel_node::init();
executor_ = new tf::Executor(std::thread::hardware_concurrency());
zero_track.rename("zero");
zero_track.init();
@ -77,17 +77,14 @@ void mixer::init(singleton_initliazer& initliazer) {
void mixer::begin_release(singleton_release_guard &release_guard) {
singleton::begin_release(release_guard);
on_latency_offset_changed.remove_object(this);
if (executor_)
executor_->wait_for_all();
delete executor_;
}
void mixer::release(singleton_release_guard& release_guard) {
singleton_t<mixer>::release(release_guard);
release_guard.require_release<audio_device_manager>();
null_channel_node::destroy();
for (const mixer_track* track : tracks_) {
delete track;
for (mixer_track* track : tracks_) {
free_track(track);
}
tracks_.clear();
}
@ -127,18 +124,15 @@ void mixer::remove_track(mixer_track* track) {
}
void mixer::process(uint32_t in_frames, circular_buffer_vector_type& out_buffer) {
if (!executor_)
return;
for (auto& flow: taskflow_) {
executor_->run(flow).wait();
g_executor->run(flow).wait();
}
get_master()->compensator_.pop(in_frames, out_buffer);
}
void mixer::reset() {
if (process_node_dirty_) {
executor_->wait_for_all();
g_executor->wait_for_all();
build_process_node();
update_taskflow(g_audio_device_manager.get_buffer_size());
process_node_dirty_ = false;
@ -229,7 +223,7 @@ void mixer::update_tasks() {
}
void mixer::update_tasks_immediate() {
executor_->wait_for_all();
g_executor->wait_for_all();
build_process_node();
update_taskflow(g_audio_device_manager.get_buffer_size());
}

View File

@ -2,8 +2,8 @@
#include "misc/delegates.h"
#include "misc/singleton/singleton.h"
#include <map>
#include <string>
#include <unordered_map>
#include "mempool.h"
#include "mixer_track.h"
@ -71,7 +71,6 @@ private:
std::vector<mixer_track*> tracks_;
std::unordered_map<mixer_track_type, mempool<>> track_pool_;
tf::Executor* executor_ = nullptr;
std::vector<tf::Taskflow> taskflow_;
bool process_node_dirty_ = false;
};

View File

@ -6,6 +6,7 @@
#include "audio/mixer/mixer.h"
#include "audio/mixer/mixer_track.h"
#include "misc/query_timer.h"
#include "misc/taskflow_singleton.h"
#include "misc/singleton/singleton_manager.h"
#include "thread_message/thread_message_hubs.h"
#include "vst2/vst2_plugin_host.h"
@ -17,7 +18,6 @@ void plugin_host_manager::init(singleton_initliazer& initliazer) {
singleton_t<plugin_host_manager>::init(initliazer);
auto* mixer_ptr = initliazer.require<mixer>();
mixer_ptr->on_remove_track.add_raw(this, &plugin_host_manager::on_mixer_track_removed);
executor_.store(new tf::Executor(std::thread::hardware_concurrency()));
}
void plugin_host_manager::release(singleton_release_guard& release_guard) {
@ -91,11 +91,7 @@ void plugin_host_manager::register_instrument_plugin(plugin_host* host) {
}
void plugin_host_manager::process(uint32_t in_frames) {
tf::Executor* executor = executor_.load();
if (!executor) {
return;
}
executor->run(taskflow_).wait();
g_executor->run(taskflow_).wait();
}
void plugin_host_manager::on_mixer_track_removed(mixer_track* track) {
@ -115,8 +111,4 @@ void plugin_host_manager::update_taskflow(uint32_t in_frames) {
}
void plugin_host_manager::begin_release(singleton_release_guard& release_guard) {
auto executor = executor_.exchange(nullptr);
if (executor)
executor->wait_for_all();
delete executor;
}

View File

@ -34,7 +34,6 @@ private:
std::vector<plugin_host*> plugin_hosts_{};
void update_taskflow(uint32_t in_frames);
std::atomic<tf::Executor*> executor_;
tf::Taskflow taskflow_;
};

View File

@ -0,0 +1,14 @@
#pragma once
#include "taskflow/taskflow.hpp"
inline tf::Executor* g_executor = nullptr;
inline void init_taskflow() {
g_executor = new tf::Executor(std::thread::hardware_concurrency());
}
inline void release_taskflow() {
if (g_executor) {
g_executor->wait_for_all();
delete g_executor;
g_executor = nullptr;
}
}