消费 job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
<?php ini_set( 'default_socket_timeout' , 86400*7); ini_set( 'memory_limit' , '256M' ); // 消费队列消息 require_once( './vendor/autoload.php' ); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk( '127.0.0.1' ,11300); $tubeName = 'email_list' ; while ( true ) { // 获取队列信息, reserve 阻塞获取 $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve(); if ( $job !== false ) { $data = $job->getData(); /* TODO 逻辑操作 */ /* 处理完成,删除 job */ $pheanstalk->delete( $job ); } } |
default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。
就是这样的代码,供参考:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public function watchJob() { $job = $ this ->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve(); if ( $job !== false ) { $job_data = $job->getData(); $ this ->subscribe( $job_data ); $ this ->pheanstalk->delete( $job ); /* 继续 Watch 下一个 job */ $ this ->watchJob(); } else { $ this ->log->error( 'reserve false' , 'reserve false' ); } } |