线程

线程的定义在百度上的定义如下:

线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System VSunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。

一个进程可以理解为一个运行软件,其至少包含一个主线程。如果将所有任务都放在主线程中运行,那么执行时会变得异常卡顿。为了减少任务的工作压力,就会引入线程池的概念。

线程池

什么是线程池?

线程池(Thread Pool)是一种并发编程中常用的技术,用于管理和重用线程。它由线程池管理器、工作队列和线程池线程组成。用户可自定义将任务加入指定的线程中,或者自动将任务放置于空闲的线程中。

线程池主要由三部分组成:

  • 线程池管理器(ThreadPoolExecutor):负责创建、管理和控制线程池。它负责线程的创建、销毁和管理,以及线程池的状态监控和调度任务。
  • 工作队列(BlockingQueue):用于存储待执行的任务。当线程池中的线程都在执行任务时,新的任务会被放入工作队列中等待执行。
  • 线程池线程(Worker Thread):实际执行任务的线程。线程池中会维护一组线程,这些线程可以被重复使用,从而避免了频繁创建和销毁线程的开销。

线程池管理器

创建线程池管理器

ThreadPool::ThreadPool(size_t numThreads) :
numThreads_(numThreads)
{
    std::cout << "Initializing ThreadPool with " << numThreads_ << " threads." << std::endl;
    for(int i = 0; i < numThreads_; ++i) {
        ThreadQueue* t = new ThreadQueue();
        std::cout << "Created ThreadQueue " << i << std::endl;
        thread_queues_.push_back(t);
    }
}

在构造函数中创建需要的线程队列,通过动态数组容器动态数组容器(std::vector)保存起来。

其实也可以使用其他STL容器保存线程,比如字典(std::map),根据需求进行选择。

在线程池管理器中可以同时管理线程队列的启动,停止和内存销毁。

工作线程

工作线程中会包含工作队列和线程两部分组成。

在工作线程的中创建线程,定义一个run函数绑定到线程中。

thread_ = new std::thread(std::bind(&ThreadQueue::run, this));

在线程队列中如何能高效的执行任务?

使用条件变量(std::condition_variable)让无任务的线程进入等待状态。

条件变量是一种线程同步机制,用于在多线程编程中实现线程间的等待和通知机制。它通常与互斥锁配合使用。

如果任务队列中没有任务或者线程池管理器下发停止命令,那么线程就会进入等待状态。直到有任务添加时下发任务时再次唤醒。

添加任务并下发唤醒线程。

void ThreadQueue::addTask(const std::function<void()> &task)
{   
    std::lock_guard<std::mutex> lock(mtx_);
    taskQueue_.push(task);
    std::cout << "Task added to the queue." << std::endl;
    cond_.notify_one();  // 通知工作线程
}

任务线程具体执行过程

void ThreadQueue::run()
{
    while (true)
    {
        std::function<void()> task;
        std::unique_lock<std::mutex> lock(mtx_);
        cond_.wait_for(lock, std::chrono::milliseconds(1000), [&]{ 
            return !taskQueue_.empty() || is_stopped_; });
        if (is_stopped_) {
            break;
        }
        if (!taskQueue_.empty()){
            // lock.lock();
            std::cout << "id: " << thread_->get_id() << " Executing task..." << std::endl;
            // auto task = taskQueue_.front();
            task = std::move(taskQueue_.front());
            taskQueue_.pop();
            task(); // Execute the task
            // lock.unlock();
        } else {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
    
}