线程
线程的定义在百度上的定义如下:
线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。
一个进程可以理解为一个运行软件,其至少包含一个主线程。如果将所有任务都放在主线程中运行,那么执行时会变得异常卡顿。为了减少任务的工作压力,就会引入线程池的概念。
线程池
什么是线程池?
线程池(Thread Pool)是一种并发编程中常用的技术,用于管理和重用线程。它由线程池管理器、工作队列和线程池线程组成。用户可自定义将任务加入指定的线程中,或者自动将任务放置于空闲的线程中。
线程池主要由三部分组成:
- 线程池管理器(ThreadPoolExecutor):负责创建、管理和控制线程池。它负责线程的创建、销毁和管理,以及线程池的状态监控和调度任务。
- 工作队列(BlockingQueue):用于存储待执行的任务。当线程池中的线程都在执行任务时,新的任务会被放入工作队列中等待执行。
- 线程池线程(Worker Thread):实际执行任务的线程。线程池中会维护一组线程,这些线程可以被重复使用,从而避免了频繁创建和销毁线程的开销。
线程池管理器
创建线程池管理器
ThreadPool::ThreadPool(size_t numThreads) :
numThreads_(numThreads)
{
std::cout << "Initializing ThreadPool with " << numThreads_ << " threads." << std::endl;
for(int i = 0; i < numThreads_; ++i) {
ThreadQueue* t = new ThreadQueue();
std::cout << "Created ThreadQueue " << i << std::endl;
thread_queues_.push_back(t);
}
}
在构造函数中创建需要的线程队列,通过动态数组容器动态数组容器(std::vector)保存起来。
其实也可以使用其他STL容器保存线程,比如字典(std::map),根据需求进行选择。
在线程池管理器中可以同时管理线程队列的启动,停止和内存销毁。
工作线程
工作线程中会包含工作队列和线程两部分组成。
在工作线程的中创建线程,定义一个run函数绑定到线程中。
thread_ = new std::thread(std::bind(&ThreadQueue::run, this));
在线程队列中如何能高效的执行任务?
使用条件变量(std::condition_variable)让无任务的线程进入等待状态。
条件变量是一种线程同步机制,用于在多线程编程中实现线程间的等待和通知机制。它通常与互斥锁配合使用。
如果任务队列中没有任务或者线程池管理器下发停止命令,那么线程就会进入等待状态。直到有任务添加时下发任务时再次唤醒。
添加任务并下发唤醒线程。
void ThreadQueue::addTask(const std::function<void()> &task)
{
std::lock_guard<std::mutex> lock(mtx_);
taskQueue_.push(task);
std::cout << "Task added to the queue." << std::endl;
cond_.notify_one(); // 通知工作线程
}
任务线程具体执行过程
void ThreadQueue::run()
{
while (true)
{
std::function<void()> task;
std::unique_lock<std::mutex> lock(mtx_);
cond_.wait_for(lock, std::chrono::milliseconds(1000), [&]{
return !taskQueue_.empty() || is_stopped_; });
if (is_stopped_) {
break;
}
if (!taskQueue_.empty()){
// lock.lock();
std::cout << "id: " << thread_->get_id() << " Executing task..." << std::endl;
// auto task = taskQueue_.front();
task = std::move(taskQueue_.front());
taskQueue_.pop();
task(); // Execute the task
// lock.unlock();
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}

参与讨论
(Participate in the discussion)
参与讨论
没有发现评论
暂无评论