目录
一、线程池的概念
二、线程池的优点
三、线程池的应用场景
四、实现线程池
五、线程池演示
线程池是一种线程使用模式
线程过多会带来调度开销,进而影响缓存局部性和整体性能
而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务
注意: 线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量
线程池本质上就是一个生产者消费者模型,其中包括了一个任务队列与若干线程
日志模块
完整的日志功能至少有日志等级、时间。最好是支持用户自定义(日志内容, 文件行,文件名等)
#pragma once
#include
#include
#include
#include
#include //日志级别
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};void LogMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOWif(level== DEBUG) return;
#endif//标准部分char stdBuffer[1024];const time_t timestamp = time(nullptr);struct tm* local_time = localtime(×tamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%d-%d-%d-%d-%d-%d] ", gLevelMap[level], local_time->tm_year + 1900, local_time->tm_mon + 1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec);//自定义部分char logBuffer[1024]; va_list args;va_start(args, format);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);printf("%s%s\n", stdBuffer, logBuffer);
}
任务模块
线程池中存储的是一个个任务,下面将任务进行封装
无论该任务是什么类型的,在该任务类中都必须包含仿函数,当处理该类型的任务时只需调用operator()即可
#pragma once
#include
#include
#include
#include "Log.hpp"typedef std::function fun_t;
class Task
{
public:Task(){}Task(int x, int y, fun_t func):_x(x), _y(y), _func(func) {}void operator ()(const std::string &name) {LogMessage(NORMAL, "%s处理完成: %d+%d=%d", name.c_str(), _x, _y, _func(_x, _y));}
public:int _x;int _y;fun_t _func;
};
线程模块
由于系统调用接口过于复杂,线程模块完成的便是线程的封装,降低接口的调用复杂度,提高代码的阅读性并提高代码的复用性
#pragma once
#include
#include
#include
#include class ThreadDate
{
public:void* _args;std::string _name;
};typedef void*(*func_t)(void*);
class Thread
{
public:Thread(size_t num,func_t callback,void* args): _function(callback){char nameBuffer[64];snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);_threadDate._name = nameBuffer;_threadDate._args = args;}~Thread() {}void Start() { pthread_create(&_tid, nullptr, _function, (void*)&_threadDate); }void Join() { pthread_join(_tid, nullptr); }std::string Name() { return _name; }
private:std::string _name;func_t _function;ThreadDate _threadDate;pthread_t _tid;
};
为什么线程池中需要有互斥锁和条件变量?
注意:
为什么线程池中的线程执行例程需要设置为静态方法?
使用pthread_create()函数创建线程时,需要为创建的线程传入一个Routine(执行例程),该Routine只有一个参数类型为void*的参数,以及返回类型为void*的返回值
此时Routine作为类的成员函数,该函数的第一个参数是隐藏的this指针,因此这里的Routine函数,貌似只有一个参数,而实际上有两个参数。此时直接将该Routine函数作为创建线程时的执行例程是不行的,无法通过编译
静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此需要将Routine设置为静态方法,此时Routine函数才真正只有一个参数类型为void*的参数
但在静态成员函数内部无法使用非静态成员变量,因此需要在创建线程时,向Routine函数传入的当前对象的this指针,此时就能够通过该this指针在Routine函数内部调用非静态成员变量了
懒汉单例模式
整个工程中线程池应该只存在一个实例,可以使用单例模式进行实现。这里采用懒汉单例模式,其最核心的思想是"延时加载",从而能够优化服务器的启动速度
//示意代码
template
class Singleton
{static T* inst;
public:static T* GetInstance() {if (inst == NULL) {inst = new T();} return inst;}
};
只有调用GetInstance()后,才会实例化出唯一的线程池实例。但存在一个严重的问题, 线程不安全。第一次调用GetInstance()时, 若多个线程同时调用, 可能会创建出多份 T 对象的实例
template
class Singleton
{static T* inst;static std::mutex lock;
public:static T* GetInstance() {if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.if (inst == NULL) {inst = new T();} lock.unlock();} return inst;}
};
可能会有些疑惑,为什么需要判定两次是否为空指针?为什么不写成下面这样呢?
template
class Singleton
{static T* inst;static std::mutex lock;
public:static T* GetInstance() {lock.lock();if (inst == NULL) {inst = new T();} lock.unlock();return inst;}
};
因为第一个线程创建出唯一线程池实例后,后续可能依然会有该函数的调用(不是为了创建出唯一实例,而是为了获得唯一实例的地址),此时多线程就会涉及到竞争锁资源以及不断的加锁与解锁,造成时间与资源的浪费。使用双重if判断则可以避免某些情况下的锁竞争(已经存在唯一实例),从而提高性能
锁防护装置模块
RAII风格,可避免在加锁区域抛异常而导致未解锁,出作用域自动解锁
#pragma once
#include
#include class Mutex
{
public:Mutex(pthread_mutex_t *mtx):_pmtx(mtx) {}void Lock() { pthread_mutex_lock(_pmtx); }void UnLock() { pthread_mutex_unlock(_pmtx); }~Mutex() {}
private:pthread_mutex_t *_pmtx;
};// RAII风格的加锁方式
class LockGuard
{
public:LockGuard(pthread_mutex_t *mtx):_mutex(mtx) { _mutex.Lock(); }~LockGuard() { _mutex.UnLock(); }
private:Mutex _mutex;
};
线程池实现
#pragma once
#include
#include
#include
#include
#include
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "Log.hpp"//懒汉模式
const int g_threadNum = 3;
template
class ThreadPool
{
public://为routine()静态函数提供pthread_mutex_t *GetMutex() { return &_mutex; }bool isEmpty() { return _taskQueue.empty(); }void WaitCond() { pthread_cond_wait(&_cond, &_mutex); }T GetTask() {T task = _taskQueue.front();_taskQueue.pop();return task;}public://需考虑多线程申请单例的情况static ThreadPool* GetThreadPool(int num = g_threadNum){if(nullptr == pool_ptr) {{LockGuard lockguard(&_init_mutex);if(nullptr == pool_ptr) {pool_ptr = new ThreadPool(num);}}}return pool_ptr;}static void* Routine(void* args) {ThreadDate* thread_date = (ThreadDate*)args;ThreadPool* thread_pool = (ThreadPool*)thread_date->_args;while(true) {T task;{LockGuard lockguard(thread_pool->GetMutex());while(thread_pool->isEmpty()) thread_pool->WaitCond();task = thread_pool->GetTask();}task(thread_date->_name);//仿函数}}void PushTask(const T& task){LockGuard lockguard(&_mutex);_taskQueue.push(task);pthread_cond_signal(&_cond);}void Run(){for(auto& iter : _threads) {iter->Start(); LogMessage(DEBUG, "%s %s", iter->Name().c_str(), "启动成功");}}~ThreadPool(){for (auto &iter : _threads) {iter->Join();delete iter;}pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:ThreadPool(int threadNum):_num(threadNum) {pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);for (int i = 1; i <= _num; i++) {_threads.push_back(new Thread(i, Routine, this));}}ThreadPool(const ThreadPool& others) = delete;ThreadPool& operator= (const ThreadPool& others) = delete;private:std::vector _threads;size_t _num;std::queue _taskQueue;
private:pthread_mutex_t _mutex;pthread_cond_t _cond;
private:static ThreadPool* pool_ptr;//避免编译器自动优化static pthread_mutex_t _init_mutex;
};template
ThreadPool* ThreadPool::pool_ptr = nullptr;
template
pthread_mutex_t ThreadPool::_init_mutex = PTHREAD_MUTEX_INITIALIZER;
主线程逻辑
主线程就负责不断向任务队列当中Push任务即可,此后线程池中的线程会从任务队列中获取任务并进行处理
#include "ThreadPool.hpp"
#include "Task.hpp"int main()
{ThreadPool* threadPool = ThreadPool::GetThreadPool(5);threadPool->Run();while(true){//生产的过程,制作任务的时候,要花时间int x = rand()%100 + 1;usleep(7721);int y = rand()%30 + 1;Task task(x, y, [](int x, int y)->int{return x + y;});LogMessage(NORMAL, "制作任务完成: %d+%d=?", x, y);//推送任务到线程池中threadPool->PushTask(task);sleep(1);}return 0;
}
运行代码后一瞬间就有六个线程,其中一个为主线程,另外五个是线程池内处理任务的线程
五个线程在处理时会呈现出一定的顺序性,因为主线程是每秒Push一个任务,五个线程中只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待,当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次Push一个任务后会唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这五个线程在处理任务时会呈现出一定的顺序性(4-1-3-5-2)
注意:此后若想让线程池处理其他不同的任务请求时,只需要提供一个任务类,在该任务类当中提供对应的operator()方法即可