2.线程池头文件编写
根据上面的思路,开始编写头文件,在头文件中定义出来我们要用到的变量和函数
1.引入头文件,并使用条件编译
这个线程池头文件threadpool.h中,既有系统提供的头文件,也有我们自定义的头文件,我们需要使用==条件编译==语句对这个线程池头文件threadpool.h进行条件编译,防止后面的代码引用这个头文件时出现==头文件重复引用==的问题
有关条件编译的知识点在这篇文章里:宏定义与条件编译
头文件引用的格式如下:
- 系统提供的头文件: #include <xxx.h>
头文件存放在一对尖括号< >内,表示引用==系统提供的头文件==,例如,stdio.h、stdlib.h头文件,是系统提供的头文件,所以,引用时,使用尖括号包含。
- 用户自定义的头文件: #include “xxx.h”
第二种格式,头文件存放在一对双引号 “” 内,表示 引用==用户自定义的头文件==。例如我们自己定义了一个test.h头文件,那么,就使用双引号包含。
代码如下:
#ifndef THREADPOOL_H#define THREADPOOL_H
#include <list>#include <cstdio>#include <exception>#include <pthread.h>#include "../lock/locker.h"#include "../CGImysql/sql_connection_pool.h"
// 代码主体
#endif
注意最后的#endif
不要忘记写,不写会报错
2.定义线程池模板类
然后就是编写代码主体的部分了,先定义一个线程池的模板类,具体的部分可以参考注释
代码如下:
// 线程池类定义template <typename T>class threadpool {public: // actor_model 用于切换的模型 // connPool 数据库连接池指针 // thread_number 是线程池中线程的数量 // max_request 请求队列中最大请求数量 threadpool(int actor_model, connection_pool* connPool, int thread_number = 8, int max_request = 10000); ~threadpool();
// ?这干嘛的,参数 state 干嘛的? bool append(T* request, int state); // 向请求队列 m_workqueue 中插入请求对象 // 并通知 pthread_create() 创建的工作线程处理该任务 bool append_p(T* request);
private: /* * 线程池的工作函数,启动一个新线程并调用 threadpool 对象的 * run() 函数来处理任务 */ static void* worker(void* arg); // 线程池的主函数,负责不断检查任务队列 void run();
private: int m_thread_number; // 线程池中的线程数 int m_max_requests; // 请求队列中最大待处理请求数 pthread_t* m_threads; // 线程池数组,大小为 m_thread_number std::list<T*> m_workqueue; // 请求队列 locker m_queuelocker; // 保护请求队列的互斥锁 sem m_queuestat; // 是否有任务需要处理的信号量 connection_pool* m_connPool; // 数据库连接池 int m_actor_model; // 模型切换};
3.模板外实现线程池的构造函数
在线程池的构造函数中,创建线程函数,线程分离函数需要注意一下
1.创建线程函数
pthread_create()函数是一个用于创建线程的函数,它属于POSIX线程库(pthread)中的一员。该函数具有以下原型:
int pthread_create(pthread_t* thread, const pthread_attr_t* attr, void* (*start_routine)(void*), void* arg);
参数说明:
thread
:指向pthread_t类型变量的指针,用于存储新创建线程的标识符。attr
:指向pthread_attr_t类型变量的指针,用于==指定新线程的属性==。可以通过该参数来设置线程的==分离状态==、==栈大小==等属性。如果传递==NULL==,则使用默认属性。start_routine
:指向一个函数(要求是==静态函数==)的指针,该函数作为新线程的==入口点==。新线程将==从该函数开始执行==。arg
:传递给start_routine
函数的参数。可以通过该参数==向新线程传递数据==。
返回值说明:
- 如果成功创建线程,函数返回0。
- 如果发生错误,函数返回一个==非零错误码==,表示错误的==类型==。
创建新线程后,新线程将在start_routine
函数中开始执行,并且可以通过arg
参数传递数据。线程可以执行任意类型的任务,包括==并行处理==、==异步操作==等。
需要注意的是,创建线程的成功与否并==不保证线程的立即执行==。==线程的调度==是==由操作系统决定==的,可能会有一定的延迟。
使用pthread_create()
函数创建线程时,通常还需要使用其他线程相关的函数来操作线程,例如pthread_join()
函数用于等待线程的结束,pthread_exit()
函数用于线程的退出等。
2.线程分离函数
pthread_detach()函数是一个用于将线程设置为==分离状态==的函数,它属于POSIX线程库(pthread)中的一员。该函数具有以下原型:
int pthread_detach(pthread_t thread);
参数说明:
thread
:要设置为分离状态的线程的标识符。
返回值说明:
- 如果成功将线程设置为分离状态,函数返回0。
- 如果发生错误,函数返回一个非零错误码,表示错误的类型。
将线程设置为分离状态后,==线程结束时系统会自动释放其占用的资源==,无需通过pthread_join()
函数等待线程的结束。这样可以==避免线程资源的泄漏==,==提高程序的性能==。
需要注意的是,==一旦将线程设置为分离状态,就无法再通过pthread_join()
函数等待该线程的结束,也无法获取线程的返回值==。因此,只有当==不再需要与线程进行交互或者不需要获取其返回值==时,才应将线程设置为分离状态。
使用pthread_detach()
函数时需要注意以下几点:
- 线程必须是==可连接(joinable)==状态才能被设置为分离状态。创建线程时可以通过设置线程属性(pthread_attr_t)的方式指定线程的可连接性。
- 分离状态的线程==不会保持僵尸状态(zombie)==,即使线程结束后也不会被保留在系统中。因此,在调用
pthread_detach()
函数之前,需要确保线程已经执行完毕或不再需要等待其结束。 - ==一旦线程被设置为分离状态,就无法再改变其状态==。如果需要重新连接一个已经分离的线程,只能通过创建新的线程来实现。
3.模板外参数列表初始化的方法
在C++中,模板函数通常会==将实现与声明分离==。如果你想在模板外部实现模板函数,可以按照以下步骤进行:
- 在头文件中声明模板函数,但不进行具体的实现。例如:
template<typename T>void foo(T arg);
- 在头文件之外的源文件中,实现模板函数的具体内容。这里需要注意的是,由于模板函数的具体实现需要知道模板参数的具体类型,因此需要在函数名后加上模板参数列表。例如:
template<typename T>void foo(T arg) { // 实现函数的具体内容}
- 如果在==模板函数中使用了其他的自定义类型或函数==,需要确保==它们的声明在模板函数的实现之前==。可以通过在==头文件中提供这些类型或函数的声明==来实现。
需要注意的是,==模板函数的声明和实现都需要在编译阶段被解析==,因此最好将==模板函数的声明和实现都放在头文件==中。这样,在使用模板函数的地方,只需要包含对应的头文件即可。
下面是一个完整的示例,展示了在模板外部实现模板函数并初始化参数列表的过程:
// 头文件 template_example.h
template<typename T>void foo(T arg);
// 源文件 template_example.cpp
#include "template_example.h"
template<typename T>void foo(T arg) { // 实现函数的具体内容}// 显式实例化模板函数,以初始化参数列表template void foo<int>(int arg);
// 主文件 main.cpp
#include "template_example.h"
int main() { foo(42); return 0;}
在源文件中的模板函数实现之后,通过显式实例化模板函数的方式,可以初始化参数列表。在上面的示例中,使用了template void foo<int>(int arg);
来显式实例化了foo()
模板函数,并将其参数列表初始化为int
类型。
这样,在主文件中的main()
函数中调用foo()
时,参数为整数,编译器就会根据参数的类型选择正确的实例化版本进行调用。
代码如下:
// 模板外实现线程池的构造函数template <typename T>threadpool<T>::threadpool(int actor_model, connection_pool* connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool) { // 参数列表初始化 // 异常判断,线程数或最大请求数小于 0,报错 if (thread_number <= 0 || max_requests <= 0) { throw std::exception(); }
// 创建线程池数组(线程 id 初始化?) m_threads = new pthread_t[m_thread_number]; // 创建线程池数组失败,抛出异常(判断线程 id 是否初始化成功?) if (!m_threads) { throw std::exception(); }
// 创建 thread_number 个线程,并将它们设置为线程分离状态 for (int i = 0; i < thread_number; ++i) { /* * pthread_create() 创建线程函数的参数需要特别注意 * 该函数原型中的参数如下: * pthread_t* thread_tid // 线程数组中线程的地址 * const pthread_attr_t* attr // 指向线程属性的指针,通常 * 设置为NULL * void* (*start_routine)(void*) // 处理线程函数的地址,注意 * 这个参数的类型为函数指针, * 要求该函数为静态函数;如 * 果该函数为类成员函数时, * 需要设置为静态成员函数 * void* arg // start_routine() 中的参数 */ // pthread_create() 函数成功时返回 0,失败时返回错误号 errno if (pthread_create(m_threads + i, NULL, worker, this) != 0) { // 创建失败,删除线程池数组,并抛出异常 delete[] m_threads; throw std::exception(); }
/* * pthread_detach() 函数 * 作用: 实现线程分离 * 参数: 线程 * 返回值:成功返回 0,失败返回错误号 */ if (pthread_detach(m_threads[i])) { // 线程设置分离状态失败,删除线程池数组,抛出异常 delete[] m_threads; throw std::exception(); } }}
4.模板外实现线程池的析构函数
线程池的析构函数还是比较简单的,代码如下:
// 模板外实现线程池的析构函数template <typename T>threadpool<T>::~threadpool() { // 删除线程池数组 delete[] m_threads;}
5.模板外实现向请求队列中插入请求的函数
这个函数在之前的代码中只有一个,我这次是看的项目源码,发现源码里有两个,功能是差不多的,只是参数不太一样
1.两个参数(T* request, int state)
的函数
代码如下:
// 模板外实现向任务队列插入任务请求函数bool threadpool<T>::append(T* request, int state) { m_queuelocker.lock(); // 添加互斥锁 // 当任务队列的大小大于最大请求数量时,解锁并报错 if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } // ???? request->m_state = state; // 向队列中添加一个请求 m_workqueue.push_back(request); m_queuelocker.unlock(); // 解锁 /* * 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性, * 需要互斥锁保证多个线程不会争抢 */
// 增加信号量,通知线程池里的线程,有新任务要处理 m_queuestat.post(); /* * 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加 * 信号量;在线程初始化时,每个工作线程都被创建并阻塞在 * m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大 * 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请 * 求 */ return true;}
2.一个参数(T* request)
的函数
代码如下:
// 模板外实现向任务队列插入任务请求函数bool threadpool<T>::append_p(T* request) { m_queuelocker.lock(); // 添加互斥锁 // 当任务队列的大小大于最大请求数量时,解锁并报错 if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } // 向队列中添加一个请求 m_workqueue.push_back(request); m_queuelocker.unlock(); // 解锁 /* * 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性, * 需要互斥锁保证多个线程不会争抢 */
// 增加信号量,通知线程池里的线程,有新任务要处理 m_queuestat.post(); /* * 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加 * 信号量;在线程初始化时,每个工作线程都被创建并阻塞在 * m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大 * 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请 * 求 */ return true;}
6.线程池的工作函数
代码如下:
// 线程池的工作函数,内部访问私有成员函数 run(),完成线程处理要求template <typename T>void* threadpool<T>::worker(void* arg) { /* * 启动线程时,传入 void* 类型参数 arg;这个参数实际上是一个指向 * threadpool 结构体的指针,所以将它强制转换成 threadpool* 类型并 * 赋值给 pool */ threadpool* pool = (threadpool*)arg; // 启动线程池中的一个或多个线程,将待处理任务提交给线程池处理 pool->run(); return pool;}
7.线程池主函数
代码如下:
// 工作线程从请求队列中取出某个任务进行处理,注意线程同步template <typename T>void threadpool<T>::run() { // ???这里原来的参数是:!m_stop??? while (true) { /* * 信号量等待: * 等待 append() 函数传过来的信号量,收到表示需要运行的线程池, * 使用其中的线程处理来处理任务 */ m_queuestat.wait(); m_queuelocker.lock(); // 被唤醒后先加互斥锁 /* * 收到信号量时,任务队列 m_workqueue 可能为空,也可能不为空, * 这取决于在等待信号量之前是否有新任务被添加到了队列中, * 如果没有新任务被添加,那么 m_workqueue 仍然为空; * 如果有新任务被添加,那么 m_workqueue 将不为空; * 需要注意的是,在多线程中,一个线程在等待信号量时,另一个线 * 程可能会往队列中添加新任务,因此需要加锁来保证对任务队列的 * 访问是线程安全的,这样可以避免出现竞争态条件(race * condition,也就是线程不同步) */ if (m_workqueue.empty()) { m_queuelocker.unlock(); // 解锁 continue; // 跳过后面的代码,开始下一个循环 } // 从请求队列中取出队首请求,并将其弹出队列 T* request = m_workqueue.front(); m_workqueue.pop_front();
m_queuelocker.unlock(); // 取完请求后,解锁 if (!request) { continue; // 没有取到就继续循环 } // ?????? if (1 == m_actor_model) {
if (0 == request->m_state) {
if (request->read_once()) {
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process(); } else {
request->improv = 1;
request->timer_flag = 1; } } else {
if (request->write()) {
request->improv = 1; } else {
request->improv = 1;
request->timer_flag = 1; } } } else {
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process(); } }}
8.线程池头文件完整代码
代码如下:
#ifndef THREADPOOL_H#define THREADPOOL_H
#include <list>#include <cstdio>#include <exception>#include <pthread.h>#include "../lock/locker.h"#include "../CGImysql/sql_connection_pool.h"
// 线程池类定义template <typename T>class threadpool {public: // actor_model 用于切换的模型 // connPool 数据库连接池指针 // thread_number 是线程池中线程的数量 // max_request 请求队列中最大请求数量 threadpool(int actor_model, connection_pool* connPool, int thread_number = 8, int max_request = 10000); ~threadpool();
// ?这干嘛的,参数 state 干嘛的? bool append(T* request, int state); // 向请求队列 m_workqueue 中插入任务请求 // 并通知 pthread_create() 创建的工作线程处理该任务 bool append_p(T* request);
private: /* * 线程池的工作函数,启动一个新线程并调用 threadpool 对象的 * run() 函数来处理任务 */ static void* worker(void* arg); // 线程池的主函数,负责不断检查任务队列 void run();
private: int m_thread_number; // 线程池中的线程数 int m_max_requests; // 请求队列中最大待处理请求数 pthread_t* m_threads; // 线程池数组,大小为 m_thread_number std::list<T*> m_workqueue; // 请求队列 locker m_queuelocker; // 保护请求队列的互斥锁 sem m_queuestat; // 是否有任务需要处理的信号量 connection_pool* m_connPool; // 数据库连接池 int m_actor_model; // 模型切换};
// 模板外实现线程池的构造函数template <typename T>threadpool<T>::threadpool(int actor_model, connection_pool* connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool) { // 参数列表初始化 // 异常判断,线程数或最大请求数小于 0,报错 if (thread_number <= 0 || max_requests <= 0) { throw std::exception(); }
// 创建线程池数组(线程 id 初始化?) m_threads = new pthread_t[m_thread_number]; // 创建线程池数组失败,抛出异常(判断线程 id 是否初始化成功?) if (!m_threads) { throw std::exception(); }
// 创建 thread_number 个线程,并将它们设置为线程分离状态 for (int i = 0; i < thread_number; ++i) { /* * pthread_create() 创建线程函数的参数需要特别注意 * 该函数原型中的参数如下: * pthread_t* thread_tid // 线程数组中线程的地址 * const pthread_attr_t* attr // 指向线程属性的指针,通常 * 设置为NULL * void* (*start_routine)(void*) // 处理线程函数的地址,注意 * 这个参数的类型为函数指针, * 要求该函数为静态函数;如 * 果该函数为类成员函数时, * 需要设置为静态成员函数 * void* arg // start_routine() 中的参数 */ // pthread_create() 函数成功时返回 0,失败时返回错误号 errno if (pthread_create(m_threads + i, NULL, worker, this) != 0) { // 创建失败,删除线程池数组,并抛出异常 delete[] m_threads; throw std::exception(); }
/* * pthread_detach() 函数 * 作用: 实现线程分离 * 参数: 线程 * 返回值:成功返回 0,失败返回错误号 */ if (pthread_detach(m_threads[i])) { // 线程设置分离状态失败,删除线程池数组,抛出异常 delete[] m_threads; throw std::exception(); } }}
// 模板外实现线程池的析构函数template <typename T>threadpool<T>::~threadpool() { // 删除线程池数组 delete[] m_threads;}
// 模板外实现向任务队列插入任务请求函数bool threadpool<T>::append(T* request, int state) { m_queuelocker.lock(); // 添加互斥锁 // 当任务队列的大小大于最大请求数量时,解锁并报错 if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } // ???? request->m_state = state; // 向队列中添加一个请求 m_workqueue.push_back(request); m_queuelocker.unlock(); // 解锁 /* * 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性, * 需要互斥锁保证多个线程不会争抢 */
// 增加信号量,通知线程池里的线程,有新任务要处理 m_queuestat.post(); /* * 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加 * 信号量;在线程初始化时,每个工作线程都被创建并阻塞在 * m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大 * 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请 * 求 */ return true;}
// 模板外实现向任务队列插入任务请求函数bool threadpool<T>::append_p(T* request) { m_queuelocker.lock(); // 添加互斥锁 // 当任务队列的大小大于最大请求数量时,解锁并报错 if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } // 向队列中添加一个请求 m_workqueue.push_back(request); m_queuelocker.unlock(); // 解锁 /* * 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性, * 需要互斥锁保证多个线程不会争抢 */
// 增加信号量,通知线程池里的线程,有新任务要处理 m_queuestat.post(); /* * 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加 * 信号量;在线程初始化时,每个工作线程都被创建并阻塞在 * m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大 * 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请 * 求 */ return true;}
// 线程池的工作函数,内部访问私有成员函数 run(),完成线程处理要求template <typename T>void* threadpool<T>::worker(void* arg) { /* * 启动线程时,传入 void* 类型参数 arg;这个参数实际上是一个指向 * threadpool 结构体的指针,所以将它强制转换成 threadpool* 类型并 * 赋值给 pool */ threadpool* pool = (threadpool*)arg; // 启动线程池中的一个或多个线程,将待处理任务提交给线程池处理 pool->run(); return pool;}
// 工作线程从请求队列中取出某个任务进行处理,注意线程同步template <typename T>void threadpool<T>::run() { // ???这里原来的参数是:!m_stop??? while (true) { /* * 信号量等待: * 等待 append() 函数传过来的信号量,收到表示需要运行的线程池, * 使用其中的线程处理来处理任务 */ m_queuestat.wait(); m_queuelocker.lock(); // 被唤醒后先加互斥锁 /* * 收到信号量时,任务队列 m_workqueue 可能为空,也可能不为空, * 这取决于在等待信号量之前是否有新任务被添加到了队列中, * 如果没有新任务被添加,那么 m_workqueue 仍然为空; * 如果有新任务被添加,那么 m_workqueue 将不为空; * 需要注意的是,在多线程中,一个线程在等待信号量时,另一个线 * 程可能会往队列中添加新任务,因此需要加锁来保证对任务队列的 * 访问是线程安全的,这样可以避免出现竞争态条件(race * condition,也就是线程不同步) */ if (m_workqueue.empty()) { m_queuelocker.unlock(); // 解锁 continue; // 跳过后面的代码,开始下一个循环 } // 从请求队列中取出队首请求,并将其弹出队列 T* request = m_workqueue.front(); m_workqueue.pop_front();
m_queuelocker.unlock(); // 取完请求后,解锁 if (!request) { continue; // 没有取到就继续循环 } // ?????? if (1 == m_actor_model) {
if (0 == request->m_state) {
if (request->read_once()) {
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process(); } else {
request->improv = 1;
request->timer_flag = 1; } } else {
if (request->write()) {
request->improv = 1; } else {
request->improv = 1;
request->timer_flag = 1; } } } else {
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process(); } }}#endif