这是一个固定线程数量的线程池
背景
通过使用线程池,我们可以有效降低多线程操作中任务申请和释放产生的性能消耗。特别是当我们每个线程的任务处理比较快时,系统大部分性能消耗都花在了pthread_create以及释放线程的过程中。那既然是这样的话,何不在程序开始运行阶段提前创建好一堆线程,等我们需要用的时候只要去这一堆线程中领一个线程,用完了再放回去,等程序运行结束时统一释放这一堆线程呢?按照这个想法,线程池出现了。
线程池原理
使用了队列来存储任务,并使用互斥量和条件变量来实现线程之间的同步。enqueue()函数用于将任务添加到队列中,线程池中的线程将从队列中取出任务并执行。
各部分代码解释
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back(
[this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
}
当线程启动时,它会进入 while 循环,准备从任务队列中获取任务并执行。
然后,线程会获取任务队列的互斥锁,以确保在访问任务队列时的线程安全。
一旦获取了互斥锁,线程会调用条件变量 condition 的 wait() 函数,等待条件变为真。这个条件是 stop 标志位被设置为 true 或者任务队列不为空。如果条件为假,线程会被阻塞,等待条件变为真时再继续执行。
当有任务被添加到任务队列中,或者线程池被要求停止时,条件变为真,线程会从 wait() 函数中返回,并继续执行。
在条件变为真后,线程会检查 stop 标志位是否为 true,以及任务队列是否为空。如果 stop 标志位为 true 且任务队列为空,则线程会直接返回,退出循环,结束线程的执行。
否则,线程会从任务队列中取出一个任务,释放互斥锁,并执行局部变量 task 中存储的任务。
循环会继续迭代,线程会再次尝试从任务队列中获取任务并执行,直到收到停止信号并且任务队列为空时,线程才会退出循环,结束执行。
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
在添加任务到任务队列之后,调用 condition 条件变量的 notify_one() 函数,通知一个等待中的线程有新任务可执行。这样可以避免因为任务队列中有新任务而导致的线程处于等待状态。
代码
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back(
[this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
}
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
void printHello() {
std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}
int main() {
ThreadPool pool(4); // 创建一个包含4个线程的线程池
// 添加打印任务到线程池
for (int i = 0; i < 8; ++i) {
pool.enqueue([] { printHello(); });
}
return 0;
}