Linux生产者消费者与信号量
创始人
2024-02-20 12:01:38
0

目录

一.生产者消费者概念

二.模拟实现基于阻塞队列的生产消费模型

2.1概念

2.2构造阻塞队列 

 三.信号量

3.1原理

3.2信号量函数

3.3信号量模拟互斥功能


一.生产者消费者概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。

 生产者消费者模型是多线程同步与互斥的一个经典场景,其特点:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)

二.模拟实现基于阻塞队列的生产消费模型

2.1概念

1.阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护来。

2.生产者线程要向阻塞队列当中Push数据,若阻塞队列已经满了,那么此时该生产者需要进行等待,直到阻塞队列中有空间时再被唤醒。消费者线程要从阻塞队列当中Pop数据,若阻塞队列为空,那么此时该消费者需要进行等待,直到阻塞队列中有新的数据时再被唤醒。

3.需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。若是队列满了,生产者要进行等待,若是队列为空,消费者要进行等待。

4.当生产者或者消费者执行后,都要去唤醒另一方。

2.2构造阻塞队列 

BlockQueue.hpp代码:

#include 
#include 
#include 
#include 
#include using namespace std;const int capacity=5;
template
class BlockQueue
{public:BlockQueue(int size=capacity)//初始化:size_(size){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&proCond_,nullptr);pthread_cond_init(&conCond_,nullptr);}~BlockQueue()//变量的销毁{pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&proCond_);pthread_cond_destroy(&conCond_);}void _push(const T in){lockQueue();//生产之前先加锁while(is_full()){pthread_cond_wait(&proCond_,&mutex_);//若队列满了,则进行等待,同时把锁释放,再次醒来时会重新获取锁}_q.push(in);unlockQueue();//生产完后解锁pthread_cond_signal(&conCond_);//消费者可能在等待,要去唤醒}T _pop(){lockQueue();//消费之前先加锁while(_q.empty()){pthread_cond_wait(&conCond_,&mutex_);//若队列为空,则进行等待,同时把锁释放,再次醒来时会重新获取锁}T out=_q.front();_q.pop();unlockQueue();//消费完后解锁pthread_cond_signal(&proCond_);//唤醒生产者return out;}private:void lockQueue(){pthread_mutex_lock(&mutex_);}void unlockQueue(){pthread_mutex_unlock(&mutex_);}bool is_full(){return size_==_q.size();}queue _q;pthread_mutex_t mutex_;pthread_cond_t proCond_;pthread_cond_t conCond_;int size_;
};

模拟任务task.cpp代码:

#include 
#include class Task
{
public:Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op){}int operator()(){return run();}int run(){int result = 0;switch (operator_){case '+':result = elemOne_ + elemTwo_;break;case '-':result = elemOne_ - elemTwo_;break;case '*':result = elemOne_ * elemTwo_;break;case '/':{if (elemTwo_ == 0){std::cout << "div zero, abort" << std::endl;result = -1;}else{result = elemOne_ / elemTwo_;}}break;case '%':{if (elemTwo_ == 0){std::cout << "mod zero, abort" << std::endl;result = -1;}else{result = elemOne_ % elemTwo_;}}break;default:std::cout << "非法操作: " << operator_ << std::endl;break;}return result;}int get(int *e1, int *e2, char *op){*e1 = elemOne_;*e2 = elemTwo_;*op = operator_;}
private:int elemOne_;int elemTwo_;char operator_;
};

 BlockQueue.cpp代码:

#include"BlockQueue.hpp"
#include"task.hpp"
#include const std::string ops = "+-*/%";
void *productor(void *args)
{BlockQueue *bqp = static_cast *>(args);while (true){//制作任务int one = rand() % 50;int two = rand() % 20;char op = ops[rand() % ops.size()];Task t(one, two, op);bqp->_push(t);//生产任务cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;sleep(1);}
}
void *consumer(void *args)
{BlockQueue *bqp = static_cast *>(args);while (true){Task t = bqp->_pop(); // 消费任int result = t();    //处理任务 int one, two;char op;t.get(&one, &two, &op);cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;}
}
int main()
{srand((unsigned long)time(nullptr));BlockQueue bq; //阻塞队列pthread_t tid1, tid2;pthread_create(&tid1, nullptr, consumer, &bq);pthread_create(&tid2, nullptr, productor, &bq);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);return 0;
}

结果:

 三.信号量

3.1原理

当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。

概念:本质上就是一个计数器,去划分临界资源的数量。

每个执行流在进入临界区之前都要先申请信号量,申请成功就有了访问临界资源的权限,当操作完毕后再释放信号量。就是对信号量做加减操作。

信号量的PV操作

p操作:申请信号量为p操作,当申请成功,就获得了访问该临界资源的权限,同时,该临界资源的数量也减少了一份,所以计数器要减一。

v操作:释放信号量为v操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一。

补充:PV操作必须是原子性的。多个执行流访问临界资源也是竞争式的,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。当信号量值为零时被申请,那么该执行流会在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。

3.2信号量函数

初始化信号量函数:

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数解释:

  • sem:需要初始化的信号量。
  • pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
  • value:信号量的初始值(计数器的初始值)。

销毁信号量函数:

int sem_destroy(sem_t *sem);

申请信号量函数(等待):

int sem_wait(sem_t *sem);

 申请成功后值减一

释放信号量函数(发布):

int sem_post(sem_t *sem);

 释放信号量后值加一

上面这些函数调用成功返回0,失败返回-1。

3.3信号量模拟互斥功能

当把信号量的值设置为1时,那么它说明临界资源的数量只有一分,信号量的作用基本等价于互斥锁。

class Sem{
public:Sem(int num){sem_init(&_sem, 0, num);}~Sem(){sem_destroy(&_sem);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}
private:sem_t _sem;
};Sem sem(1); //二元信号量
int tickets=1000;
void* getTickets(void* args)
{string s=(char*)args;while(true){sem.P();if(tickets>0){usleep(10000);cout<

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...