-
PHP使用enqueue/amqp-lib实现rabbitmq任务处理
这篇文章主要为大家详细介绍了PHP如何使用enqueue/amqp-lib实现rabbitmq任务处理,文中的示例代码讲解详细,感兴趣的小伙伴可以学习一下
一:拓展安装
composer require enqueue/amqp-lib
文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md
二:方法介绍
1:连接rabbitmq
$factory = new AmqpConnectionFactory([
'host' => '192.168.6.88',//host
'port' => '5672',//端口
'vhost' => '/',//虚拟主机
'user' => 'admin',//账号
'pass' => 'admin',//密码
]);
$context = $factory->createContext();
2:声明主题
//声明并创建主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
//删除主题
$context->deleteTopic($fooTopic);
3:声明队列
//声明并创建队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
//删除队列
$context->deleteQueue($fooQueue);
4:将队列绑定到主题
1
$context->bind(new AmqpBind($fooTopic, $fooQueue));
5:发送消息
//向队列发送消息
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);
//向队列发送优先消息
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue(queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
//设置队列的最大优先级
$fooQueue->setArguments(['x-max-priority' => 10]);
$context->declareQueue($fooQueue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者
->send($fooQueue, $message);
//向队列发送延时消息
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDelayStrategy(new RabbitMqDlxDelayStrategy())
->setDeliveryDelay(5000) //消息延时5秒
->send($fooQueue, $message);
6:消费消息【接收消息】
//消费消息
$consumer = $context->createConsumer($fooQueue);
$message = $consumer->receive();
// process a message
//业务代码
$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
//订阅消费者
$fooConsumer = $context->createConsumer($fooQueue);
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
// process message
//业务代码
$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
return true;
});
$subscriptionConsumer->consume();
//清除队列消息
$queueName = 'rabbitmq';
$queue = $context->createQueue($queueName);
$context->purgeQueue($queue);
三:简单实现
1:发送消息
//连接rabbitmq
$factory = new AmqpConnectionFactory([
'host' => '192.168.6.88',
'port' => '5672',
'vhost' => '/',
'user' => 'admin',
'pass' => 'admin',
'persisted' => false,
]);
$context = $factory->createContext();
//声明主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
//声明队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
//将队列绑定到主题
$context->bind(new AmqpBind($fooTopic, $fooQueue));
//发送消息到队列
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);
2:消费消息
$factory = new AmqpConnectionFactory([
'host' => '192.168.6.88',
'port' => '5672',
'vhost' => '/',
'user' => 'admin',
'pass' => 'admin',
'persisted' => false,
]);
$context = $factory->createContext();
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooConsumer = $context->createConsumer($fooQueue);
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
// process message
//业务代码
$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
return true;
});
$subscriptionConsumer->consume();
到此这篇关于PHP使用enqueue/amqp-lib实现rabbitmq任务处理的文章就介绍到这了,更多相关PHP rabbitmq任务处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持
原文链接:https://blog.csdn.net/huaweichenai/article/details/136618043
栏目列表
最新更新
vbs能调用的系统对象小结
vbscript网页模拟登录效果代码
VBScript 根据IE窗口的标题输出ESC
杀死指定进程名称的小VBS
通过vbs修改以点结尾的文件的属性为隐藏
查询电脑开关机时间的vbs代码
VBA中的Timer函数用法
ComboBox 控件的用法教程
在windows 64位操作系统上运行32位的vbscri
无法执行vbs脚本中遇到的问题及解决方案
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
uniapp/H5 获取手机桌面壁纸 (静态壁纸)
[前端] DNS解析与优化
为什么在js中需要添加addEventListener()?
JS模块化系统
js通过Object.defineProperty() 定义和控制对象
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比