




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
第PHP+RabbitMQ實現消息隊列的完整代碼為什么使用RabbitMq而不是ActiveMq或者RocketMq?
首先,從業務上來講,我并不要求消息的100%接受率,并且,我需要結合php開發,RabbitMq相較RocketMq,延遲較低(微妙級)。至于ActiveMq,貌似問題較多。RabbitMq對各種語言的支持較好,所以選擇RabbitMq。
先安裝PHP對應的RabbitMQ,這里用的是php_amqp不同的擴展實現方式會有細微的差異.
php擴展地址:/package/amqp
具體以官網為準/getstarted.html
介紹
config.php配置信息
BaseMQ.phpMQ基類
ProductMQ.php生產者類
ConsumerMQ.php消費者類
Consumer2MQ.php消費者2(可有多個)
config.php
return[
//配置
'host'=[
'host'='',
'port'='5672',
'login'='guest',
'password'='guest',
'vhost'='/',
//交換機
'exchange'='word',
//路由
'routes'=[],
];
BaseMQ.php
*CreatedbyPhpStorm.
*User:pc
*Date:2025/12/13
*Time:14:11
namespaceMyObjSummary\rabbitMQ;
/**Member
*AMQPChannel
*AMQPConnection
*AMQPEnvelope
*AMQPExchange
*AMQPQueue
*ClassBaseMQ
*@packageMyObjSummary\rabbitMQ
classBaseMQ
/**MQChannel
*@var\AMQPChannel
public$AMQPChannel;
/**MQLink
*@var\AMQPConnection
public$AMQPConnection;
/**MQEnvelope
*@var\AMQPEnvelope
public$AMQPEnvelope;
/**MQExchange
*@var\AMQPExchange
public$AMQPExchange;
/**MQQueue
*@var\AMQPQueue
public$AMQPQueue;
/**conf
*@var
public$conf;
/**exchange
*@var
public$exchange;
/**link
*BaseMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
$conf=require'config.php';
if(!$conf)
thrownew\AMQPConnectionException('configerror!');
$this-conf=$conf['host'];
$this-exchange=$conf['exchange'];
$this-AMQPConnection=new\AMQPConnection($this-conf);
if(!$this-AMQPConnection-connect())
thrownew\AMQPConnectionException("Cannotconnecttothebroker!\n");
*closelink
publicfunctionclose()
$this-AMQPConnection-disconnect();
/**Channel
*@return\AMQPChannel
*@throws\AMQPConnectionException
publicfunctionchannel()
if(!$this-AMQPChannel){
$this-AMQPChannel=new\AMQPChannel($this-AMQPConnection);
return$this-AMQPChannel;
/**Exchange
*@return\AMQPExchange
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
publicfunctionexchange()
if(!$this-AMQPExchange){
$this-AMQPExchange=new\AMQPExchange($this-channel());
$this-AMQPExchange-setName($this-exchange);
return$this-AMQPExchange;
/**queue
*@return\AMQPQueue
*@throws\AMQPConnectionException
*@throws\AMQPQueueException
publicfunctionqueue()
if(!$this-AMQPQueue){
$this-AMQPQueue=new\AMQPQueue($this-channel());
return$this-AMQPQueue;
/**Envelope
*@return\AMQPEnvelope
publicfunctionenvelope()
if(!$this-AMQPEnvelope){
$this-AMQPEnvelope=new\AMQPEnvelope();
return$this-AMQPEnvelope;
}
ProductMQ.php
//生產者P
namespaceMyObjSummary\rabbitMQ;
require'BaseMQ.php';
classProductMQextendsBaseMQ
private$routes=['hello','word'];//路由key
*ProductMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
parent::__construct();
/**只控制發送成功不接受消費者是否收到
*@throws\AMQPChannelException
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
publicfunctionrun()
//頻道
$channel=$this-channel();
//創建交換機對象
$ex=$this-exchange();
//消息內容
$message='productmessage'.rand(1,99999);
//開始事務
$channel-startTransaction();
$sendEd=true;
foreach($this-routesas$route){
$sendEd=$ex-publish($message,$route);
echo"SendMessage:".$sendEd."\n";
if(!$sendEd){
$channel-rollbackTransaction();
$channel-commitTransaction();//提交事務
$this-close();
die;
try{
(newProductMQ())-run();
}catch(\Exception$exception){
var_dump($exception-getMessage());
}
ConsumerMQ.php
//消費者C
namespaceMyObjSummary\rabbitMQ;
require'BaseMQ.php';
classConsumerMQextendsBaseMQ
private$q_name='hello';//隊列名
private$route='hello';//路由key
*ConsumerMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
parent::__construct();
/**接受消息如果終止重連時會有消息
*@throws\AMQPChannelException
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
*@throws\AMQPQueueException
publicfunctionrun()
//創建交換機
$ex=$this-exchange();
$ex-setType(AMQP_EX_TYPE_DIRECT);//direct類型
$ex-setFlags(AMQP_DURABLE);//持久化
//echo"ExchangeStatus:".$ex-declare()."\n";
//創建隊列
$q=$this-queue();
//var_dump($q-declare());exit();
$q-setName($this-q_name);
$q-setFlags(AMQP_DURABLE);//持久化
//echo"MessageTotal:".$q-declareQueue()."\n";
//綁定交換機與隊列,并指定路由鍵
echo'QueueBind:'.$q-bind($this-exchange,$this-route)."\n";
//阻塞模式接收消息
echo"Message:\n";
while(True){
$q-consume(function($envelope,$queue){
$msg=$envelope-getBody();
echo$msg."\n";//處理消息
$queue-ack($envelope-getDeliveryTag());//
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 細節復習無人機駕駛員考試試題及答案
- 標準廠房項目施工期環境影響及防治措施
- 盯緊重點審計試題及答案
- 2025年建筑行業發展試題及答案
- 消防設施日常檢查與維護試題及答案
- 高級會計預算管理試題及答案解析
- 針對無人機執照考試的解題試題及答案
- 22025年護師考試實習心得試題及答案
- 消防安全監測技術升級應用試題及答案
- 經典考點對比中級審計師試題及答案
- 霧都孤兒讀書報告
- 職業生涯規劃家庭影響因素
- 2024年江蘇交通文化傳媒有限公司招聘筆試參考題庫含答案解析
- 安心護行 從個案分析看創傷骨科患者VTE管理低分子肝素合理應用版本
- JGT501-2016 建筑構件連接處防水密封膏
- 實驗 驗證牛頓第二定律
- 鉆孔水文地質工程地質綜合編錄一覽表模板
- 備用柴油發電機定期啟動試驗記錄表
- 國企食堂運作方案
- 二年級上冊心理健康教育說課稿-面對批評 全國通用
- 勞務派遣合同示范文本(4篇)
評論
0/150
提交評論