目录
一.生产者消费者概念
二.模拟实现基于阻塞队列的生产消费模型
2.1概念
2.2构造阻塞队列
三.信号量
3.1原理
3.2信号量函数
3.3信号量模拟互斥功能
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点:
1.阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护来。
2.生产者线程要向阻塞队列当中Push数据,若阻塞队列已经满了,那么此时该生产者需要进行等待,直到阻塞队列中有空间时再被唤醒。消费者线程要从阻塞队列当中Pop数据,若阻塞队列为空,那么此时该消费者需要进行等待,直到阻塞队列中有新的数据时再被唤醒。
3.需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。若是队列满了,生产者要进行等待,若是队列为空,消费者要进行等待。
4.当生产者或者消费者执行后,都要去唤醒另一方。
BlockQueue.hpp代码:
#include
#include
#include
#include
#include using namespace std;const int capacity=5;
template
class BlockQueue
{public:BlockQueue(int size=capacity)//初始化:size_(size){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&proCond_,nullptr);pthread_cond_init(&conCond_,nullptr);}~BlockQueue()//变量的销毁{pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&proCond_);pthread_cond_destroy(&conCond_);}void _push(const T in){lockQueue();//生产之前先加锁while(is_full()){pthread_cond_wait(&proCond_,&mutex_);//若队列满了,则进行等待,同时把锁释放,再次醒来时会重新获取锁}_q.push(in);unlockQueue();//生产完后解锁pthread_cond_signal(&conCond_);//消费者可能在等待,要去唤醒}T _pop(){lockQueue();//消费之前先加锁while(_q.empty()){pthread_cond_wait(&conCond_,&mutex_);//若队列为空,则进行等待,同时把锁释放,再次醒来时会重新获取锁}T out=_q.front();_q.pop();unlockQueue();//消费完后解锁pthread_cond_signal(&proCond_);//唤醒生产者return out;}private:void lockQueue(){pthread_mutex_lock(&mutex_);}void unlockQueue(){pthread_mutex_unlock(&mutex_);}bool is_full(){return size_==_q.size();}queue _q;pthread_mutex_t mutex_;pthread_cond_t proCond_;pthread_cond_t conCond_;int size_;
};
模拟任务task.cpp代码:
#include
#include class Task
{
public:Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op){}int operator()(){return run();}int run(){int result = 0;switch (operator_){case '+':result = elemOne_ + elemTwo_;break;case '-':result = elemOne_ - elemTwo_;break;case '*':result = elemOne_ * elemTwo_;break;case '/':{if (elemTwo_ == 0){std::cout << "div zero, abort" << std::endl;result = -1;}else{result = elemOne_ / elemTwo_;}}break;case '%':{if (elemTwo_ == 0){std::cout << "mod zero, abort" << std::endl;result = -1;}else{result = elemOne_ % elemTwo_;}}break;default:std::cout << "非法操作: " << operator_ << std::endl;break;}return result;}int get(int *e1, int *e2, char *op){*e1 = elemOne_;*e2 = elemTwo_;*op = operator_;}
private:int elemOne_;int elemTwo_;char operator_;
};
BlockQueue.cpp代码:
#include"BlockQueue.hpp"
#include"task.hpp"
#include const std::string ops = "+-*/%";
void *productor(void *args)
{BlockQueue *bqp = static_cast *>(args);while (true){//制作任务int one = rand() % 50;int two = rand() % 20;char op = ops[rand() % ops.size()];Task t(one, two, op);bqp->_push(t);//生产任务cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;sleep(1);}
}
void *consumer(void *args)
{BlockQueue *bqp = static_cast *>(args);while (true){Task t = bqp->_pop(); // 消费任int result = t(); //处理任务 int one, two;char op;t.get(&one, &two, &op);cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;}
}
int main()
{srand((unsigned long)time(nullptr));BlockQueue bq; //阻塞队列pthread_t tid1, tid2;pthread_create(&tid1, nullptr, consumer, &bq);pthread_create(&tid2, nullptr, productor, &bq);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);return 0;
}
结果:
当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
概念:本质上就是一个计数器,去划分临界资源的数量。
每个执行流在进入临界区之前都要先申请信号量,申请成功就有了访问临界资源的权限,当操作完毕后再释放信号量。就是对信号量做加减操作。
信号量的PV操作
p操作:申请信号量为p操作,当申请成功,就获得了访问该临界资源的权限,同时,该临界资源的数量也减少了一份,所以计数器要减一。
v操作:释放信号量为v操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一。
补充:PV操作必须是原子性的。多个执行流访问临界资源也是竞争式的,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。当信号量值为零时被申请,那么该执行流会在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
初始化信号量函数:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数解释:
销毁信号量函数:
int sem_destroy(sem_t *sem);
申请信号量函数(等待):
int sem_wait(sem_t *sem);
申请成功后值减一
释放信号量函数(发布):
int sem_post(sem_t *sem);
释放信号量后值加一
上面这些函数调用成功返回0,失败返回-1。
当把信号量的值设置为1时,那么它说明临界资源的数量只有一分,信号量的作用基本等价于互斥锁。
class Sem{
public:Sem(int num){sem_init(&_sem, 0, num);}~Sem(){sem_destroy(&_sem);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}
private:sem_t _sem;
};Sem sem(1); //二元信号量
int tickets=1000;
void* getTickets(void* args)
{string s=(char*)args;while(true){sem.P();if(tickets>0){usleep(10000);cout<