php basic publish,RabbitMQ入门(PHP语言描述)
一 "Hello World!"
生產(chǎn)者:
/*
* php G:\wamp\www\mygedu\yii tools/send-mq msg*/
public?function?actionSendMq($argv=''){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->queue_declare('hello',?false,?false,?false,?false);
$msg?=?new?AMQPMessage($argv);
$channel->basic_publish($msg,?'',?'hello');
echo?"?[x]?Sent?'$argv'".PHP_EOL;
$channel->close();
$connection->close();
}
消費(fèi)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/receive-mq
*/
public?function?actionReceiveMq(){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->queue_declare('hello',?false,?false,?false,?false);
echo?'?[*]?Waiting?for?messages.?To?exit?press?CTRL+C',?"\n";
$callback?=?function($msg)?{
echo?"?[x]?Received?",?$msg->body,?"\n";
};
$channel->basic_consume('hello',?'',?false,?true,?false,?false,?$callback);
while(count($channel->callbacks))?{
$channel->wait();
}
}
二?Work?queues
生產(chǎn)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/new-task?msg
*/
public?function?actionNewTask($data='Hello?World!'){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->queue_declare('my_task_queue',?false,?true,?false,?false);
$msg?=?new?AMQPMessage($data,
array('delivery_mode'?=>?2)?#?make?message?persistent
);
$channel->basic_publish($msg,?'',?'my_task_queue');
echo?"?[x]?Sent?",?$data,?"\n";
$channel->close();
$connection->close();
}
消費(fèi)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/worker
*/
public?function?actionWorker(){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->queue_declare('my_task_queue',?false,?true,?false,?false);
echo?'?[*]?Waiting?for?messages.?To?exit?press?CTRL+C',?"\n";
$callback?=?function($msg){
echo?"?[x]?Received?",?$msg->body,?"\n";
sleep(substr_count($msg->body,?'.'));
echo?"?[x]?Done",?"\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null,?1,?null);
$channel->basic_consume('my_task_queue',?'',?false,?false,?false,?false,?$callback);
while(count($channel->callbacks))?{
$channel->wait();
}
$channel->close();
$connection->close();
}
三?Publish/Subscribe
生產(chǎn)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/emit-log?msg
*/
public?function?actionEmitLog($data='Hello?World!'){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->exchange_declare('logs',?'fanout',?false,?false,?false);
if(empty($data))?$data?=?"info:?Hello?World!";
$msg?=?new?AMQPMessage($data);
$channel->basic_publish($msg,?'logs');
echo?"?[x]?Sent?",?$data,?"\n";
$channel->close();
$connection->close();
}
消費(fèi)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/receive-logs
*/
public?function?actionReceiveLogs(){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->exchange_declare('logs',?'fanout',?false,?false,?false);
list($queue_name,?,)?=?$channel->queue_declare("",?false,?false,?true,?false);
$channel->queue_bind($queue_name,?'logs');
echo?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C',?"\n";
$callback?=?function($msg){
echo?'?[x]?',?$msg->body,?"\n";
};
$channel->basic_consume($queue_name,?'',?false,?true,?false,?false,?$callback);
while(count($channel->callbacks))?{
$channel->wait();
}
$channel->close();
$connection->close();
}
四?Routing
生產(chǎn)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/emit-log-direct?info?msg
*/
public?function?actionEmitLogDirect($argv,?$data='Hello?World!'){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->exchange_declare('direct_logs',?'direct',?false,?false,?false);
$severity?=?isset($argv)?&&?!empty($argv)???$argv?:?'info';
$msg?=?new?AMQPMessage($data);
$channel->basic_publish($msg,?'direct_logs',?$severity);
echo?"?[x]?Sent?",$severity,':',$data,"?\n";
$channel->close();
$connection->close();
}
消費(fèi)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/receive-logs-direct?info,warning,error
*/
public?function?actionReceiveLogsDirect($argv){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->exchange_declare('direct_logs',?'direct',?false,?false,?false);
list($queue_name,?,)?=?$channel->queue_declare("",?false,?false,?true,?false);
$severities?=?explode(',',?$argv);
if(empty($severities))?{
file_put_contents('php://stderr',?"Usage:?$argv[0]?[info]?[warning]?[error]\n");
exit(1);
}
foreach($severities?as?$severity)?{
$channel->queue_bind($queue_name,?'direct_logs',?$severity);
}
echo?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C',?"\n";
$callback?=?function($msg){
echo?'?[x]?',$msg->delivery_info['routing_key'],?':',?$msg->body,?"\n";
};
$channel->basic_consume($queue_name,?'',?false,?true,?false,?false,?$callback);
while(count($channel->callbacks))?{
$channel->wait();
}
$channel->close();
$connection->close();
}
五?Topics
生產(chǎn)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/topics-emit-log-direct?info?msg
*/
public?function?actionTopicsEmitLogDirect($routing_key='kern.critical',?$data='Hello?World!'){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->exchange_declare('topic_logs',?'topic',?false,?false,?false);
$msg?=?new?AMQPMessage($data);
$channel->basic_publish($msg,?'topic_logs',?$routing_key);
echo?"?[x]?Sent?",$routing_key,':',$data,"?\n";
$channel->close();
$connection->close();
}
消費(fèi)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/topics-receive-logs-direct?info,warning,error
*/
public?function?actionTopicsReceiveLogsDirect($binding_keys=''){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->exchange_declare('topic_logs',?'topic',?false,?false,?false);
list($queue_name,?,)?=?$channel->queue_declare("",?false,?false,?true,?false);
$binding_keys?=?explode(',',?$binding_keys);
if(?empty($binding_keys?))?{
file_put_contents('php://stderr',?"Usage:?$binding_keys\n");
exit(1);
}
foreach($binding_keys?as?$binding_key)?{
$channel->queue_bind($queue_name,?'topic_logs',?$binding_key);
}
echo?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C',?"\n";
$callback?=?function($msg){
echo?'?[x]?',$msg->delivery_info['routing_key'],?':',?$msg->body,?"\n";
};
$channel->basic_consume($queue_name,?'',?false,?true,?false,?false,?$callback);
while(count($channel->callbacks))?{
$channel->wait();
}
$channel->close();
$connection->close();
}
六?RPC
生產(chǎn)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/rpc-client?10
*/
public?function?actionRpcClient($fib=10){
$fibonacci_rpc?=?new?FibonacciRpcClient();
$response?=?$fibonacci_rpc->call($fib);
echo?"?[.]?Got?",?$response,?"\n";
}
消費(fèi)者:
/*
*?php?G:\wamp\www\mygedu\yii?tools/rpc-server
*/
public?function?actionRpcServer($routing_key='kern.critical',?$data='Hello?World!'){
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();
$channel->queue_declare('rpc_queue',?false,?false,?false,?false);
function?fib($n)?{
if?($n?==?0)
return?0;
if?($n?==?1)
return?1;
return?fib($n-1)?+?fib($n-2);
}
echo?"?[x]?Awaiting?RPC?requests\n";
$callback?=?function($req)?{
$n?=?intval($req->body);
echo?"?[.]?fib(",?$n,?")\n";
$msg?=?new?AMQPMessage(
(string)?fib($n),
array('correlation_id'?=>?$req->get('correlation_id'))
);
$req->delivery_info['channel']->basic_publish(
$msg,?'',?$req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null,?1,?null);
$channel->basic_consume('rpc_queue',?'',?false,?false,?false,?false,?$callback);
while(count($channel->callbacks))?{
$channel->wait();
}
$channel->close();
$connection->close();
}
相關(guān)類:
class?FibonacciRpcClient?{
private?$connection;
private?$channel;
private?$callback_queue;
private?$response;
private?$corr_id;
public?function?__construct()?{
$this->connection?=?new?AMQPStreamConnection(
'localhost',?5672,?'guest',?'guest');
$this->channel?=?$this->connection->channel();
list($this->callback_queue,?,)?=?$this->channel->queue_declare(
"",?false,?false,?true,?false);
$this->channel->basic_consume(
$this->callback_queue,?'',?false,?false,?false,?false,
array($this,?'on_response'));
}
public?function?on_response($rep)?{
if($rep->get('correlation_id')?==?$this->corr_id)?{
$this->response?=?$rep->body;
}
}
public?function?call($n)?{
$this->response?=?null;
$this->corr_id?=?uniqid();
$msg?=?new?AMQPMessage(
(string)?$n,
array('correlation_id'?=>?$this->corr_id,
'reply_to'?=>?$this->callback_queue)
);
$this->channel->basic_publish($msg,?'',?'rpc_queue');
while(!$this->response)?{
$this->channel->wait();
}
return?intval($this->response);
}
}
總結(jié)
以上是生活随笔為你收集整理的php basic publish,RabbitMQ入门(PHP语言描述)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PHP控制转盘抽奖代码,PHP 根据概率
- 下一篇: php closure invoke,P