Hyperf连接使用RabbitMQ消息中间件
composer require hyperf/amqp
composer require hyperf/command
假设已经在rabbitmq设置了交换机exchange_test和队列queue_test
['host' => '127.0.0.1',//rabbitmq服务的地址'port' => 5672,'user' => 'user','password' => '123456','vhost' => '/','concurrent' => ['limit' => 1,],'pool' => ['connections' => 1,],'params' => ['insist' => false,'login_method' => 'AMQPLAIN','login_response' => null,'locale' => 'en_US','connection_timeout' => 3.0,'read_write_timeout' => 6.0,'context' => null,'keepalive' => false,'heartbeat' => 3,'close_on_destruct' => false,],],'pool2' => [...]
];
php bin/hyperf.php gen:amqp-producer DemoProducer
exchange是交换机,routingKey是队列名
public function __construct($data){//将收到的数据加入队列$this->plyload = $data;}
}
php bin/hyperf.php gen:command FooCommand
代码
/*** 执行的命令行** @var string*/protected $name = 'foo:command';public function handle(){//协程代码,创建1000个协程分别处理$wg = new \Hyperf\Utils\WaitGroup();$wg->add(1000);// 计数器加1000for($i=0;$i<1000;$i++){// 创建协程$ico(function () use ($wg) {//amqp代码,将数据加入生产者队列$message = new DemoProducer(['id'=>$i]);$producer = ApplicationContext::getContainer()->get(Producer::class);$result = $producer->produce($message); // 计数器减一$wg->done();});}// 等待所有协程运行完成$wg->wait();}
}
调用命令行,来生产数据
php bin/hyperf.php foo:command
至此,进入rabbitmq后台,对应的队列里就会有数据。
php bin/hyperf.php gen:amqp-consumer DemoConsumer
代码解释如上,多的queue也是队列名,num是进程数
public function consumeMessage($data, AMQPMessage $message): string{print_r($data);return Result::ACK;}
}
重启框架会自动调用消费者
php bin/hyperf.php start
原创码字不易,喜欢请收藏关注
部分参考自:https://www.bilibili.com/video/BV1de4y1E7Ya/?vd_source=36102b089bcd7ff8177499ba833633e0