




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
RabbitMQ介紹
RabbitMQ介紹
1目錄什么是MQMQ有什么優(yōu)勢哪些情況下建議使用MQ什么是RabbitMQ選擇RabbitMQ理由RabbitMQ服務(wù)場景RabbitMQ結(jié)構(gòu)圖RabbitMQ名詞解釋目錄什么是MQ2目錄RabbitMQ客戶端使用流程(productor/cunsumer)Productor范例代碼及注意事項(xiàng)Consumer范例代碼及注意事項(xiàng)開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)RabbitMQ與Spring整合范例代碼目錄RabbitMQ客戶端使用流程(productor/cu3什么是MQ?MQ全稱為MessageQueue,消息隊(duì)列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法(消息傳遞),一般用作進(jìn)程間通訊MQ有什么優(yōu)勢?MQ本身是異步的,往隊(duì)列里發(fā)送消息后無需等待,不同于通信協(xié)議。如HTTP協(xié)議(同步),客戶端發(fā)出請求后必須等待服務(wù)器回應(yīng)哪些情況下建議使用MQ高并發(fā)應(yīng)用來不及處理,實(shí)時(shí)性要求不高多應(yīng)用之間異步通信,且耗時(shí)操作什么是MQ?4什么是RabbitMQRabbitMQ是由Erlang(愛立信公司)語言開發(fā),實(shí)現(xiàn)AdvancedMessageQueuingProtocol(AMQP高級消息隊(duì)列協(xié)議)的消息中間件。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。什么是RabbitMQRabbitMQ是由Erlang(愛立5選擇RabbitMQ理由Reliability可靠性Exchange交換機(jī)、Queue隊(duì)列、Message消息持久化、高可用性FlexibleRouting靈活路由Clustering集群分為Disc(硬盤)與RAM(內(nèi)存),保證至少一臺DiscHighlyAvailableQueues高可用隊(duì)列與集群結(jié)合使用,設(shè)置隊(duì)列間的消息同步ManagementUI管理界面選擇RabbitMQ理由Reliability可靠性6異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電/隊(duì)列爆滿)單機(jī)丟失RabbitMQ支持集群,多臺機(jī)器隊(duì)列同步,丟失消息可從其他機(jī)器上獲取網(wǎng)絡(luò)丟失掉電RabbitMQ支持持久化,數(shù)據(jù)保存在硬盤上隊(duì)列爆滿RabbitMQ支持流控機(jī)制,可修改內(nèi)存大小,默認(rèn)為機(jī)器內(nèi)存的40%異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電7RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(異步)WorkQueues(消息均勻分配消息給消費(fèi)者)Publish/Subscribe(廣播模式,消息分發(fā)給所有的消費(fèi)者)Routing(消費(fèi)者接收消息由路由規(guī)則決定,簡單路由名)Topics(消費(fèi)者接收消息由路由規(guī)則決定,路由規(guī)則名比較復(fù)雜)RPC遠(yuǎn)程調(diào)用(同步)RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(8RabbitMQ結(jié)構(gòu)圖RabbitMQ結(jié)構(gòu)圖9RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例如RabbitMQ服務(wù)Vhost:虛擬主機(jī),默認(rèn)為“/”,一個(gè)broker里可以有多個(gè)vhost,區(qū)分不同用戶權(quán)限,類似java的命令空間Connection:應(yīng)用程序與broker連接,可有多個(gè)連接Channel:消息通道,connection中可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù),所有操作都在channel中進(jìn)行。RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例10RabbitMQ名詞解釋Exchange:消息交換機(jī),channel中可有多個(gè),用于投遞消息。應(yīng)用程序發(fā)送消息時(shí)先把消息給交換機(jī),由交換機(jī)投遞給隊(duì)列,不是直接給隊(duì)列Queue:隊(duì)列,用于存放消息Message:消息,應(yīng)用程序需要發(fā)送的數(shù)據(jù)Bind:根據(jù)routingKey綁定exchange與queue規(guī)則,決定消息發(fā)送的方向RabbitMQ名詞解釋Exchange:消息交換機(jī),cha11RabbitMQ對象間關(guān)系broker可多個(gè)Connection可多個(gè)Channel可多個(gè)Exchange可多個(gè)QueuemessageRabbitMQ對象間關(guān)系broker可多個(gè)可多個(gè)可多個(gè)可多12Exchange主要3種類型Fanout:不處理路由鍵(沒有routingKey),只需把隊(duì)列綁定到交換機(jī)上。發(fā)送到交換機(jī)的消息都會(huì)轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上,類似于廣播,轉(zhuǎn)發(fā)消息是最快的Exchange主要3種類型Fanout:不處理路由鍵(沒有13Exchange主要3種類型Direct:處理路由鍵(有routingKey)。將一隊(duì)列綁定到交換機(jī)上,該消息需與一個(gè)特定的路由鍵(routingKey)完全匹配Exchange主要3種類型Direct:處理路由鍵(有ro14Exchange主要3種類型Topic:與direct類似,功能更強(qiáng),支持模糊綁定*表示通配一個(gè)詞#表示通配0個(gè)或多個(gè)詞Exchange主要3種類型Topic:與direct類似,15RabbitMQ客戶端使用流程(productor/cunsumer)RabbitMQ客戶端使用流程(productor/cuns16Productor范例代碼及注意事項(xiàng)//以exchange為direct為例package
com.rabbitmq.test.ow.demo2;import
java.io.IOException;import
com.rabbitmq.client.BlockedListener;import
com.rabbitmq.client.Channel;import
com.rabbitmq.client.Connection;import
com.rabbitmq.client.ConnectionFactory;import
com.rabbitmq.client.MessageProperties;import
com.rabbitmq.client.ShutdownListener;import
com.rabbitmq.client.ShutdownSignalException;import
com.rabbitmq.client.impl.AMQCommand;import
com.rabbitmq.client.impl.AMQImpl;/***
Title:
Producer.java*
Description:【生產(chǎn)者樣例】*
@author:
zengqiang.yang*
@date:
2013-12-25**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司
*/public
class
Producer{
//交換機(jī)命名規(guī)范:e_模塊_其他
private
static
final
StringEXCHANGE_NAME
="e_tyb_test";
//routingkey命名規(guī)范:r_模塊_其他
private
static
final
StringROUTING_KEY
="r_tyb_test";
public
static
void
main(String[]argv)throws
Exception{
//注意:factory應(yīng)為單例,不要每次取消息新建一次對象
ConnectionFactoryfactory=new
ConnectionFactory();
factory.setHost("90");
factory.setPort(5672);//默認(rèn)端口Productor范例代碼及注意事項(xiàng)//以exchange為17Productor范例代碼及注意事項(xiàng)
factory.setUsername("guest");//默認(rèn)用戶名
factory.setPassword("guest");//默認(rèn)密碼
factory.setVirtualHost("/");//默認(rèn)虛擬主機(jī),區(qū)分權(quán)限
//設(shè)置心跳時(shí)間,防止長時(shí)間未活動(dòng)被防火墻殺死,默認(rèn)600秒,單位:秒//
factory.setRequestedHeartbeat(60*4);
//連接超時(shí)時(shí)間,單位:毫秒//
factory.setConnectionTimeout(1000*2);
//注意:connection應(yīng)為單例,不要每次取消息新建一次對象
Connectionconnection=factory.newConnection();
//監(jiān)聽connection關(guān)閉異常
connection.addShutdownListener(new
ShutdownListener(){ @Override
public
void
shutdownCompleted(ShutdownSignalExceptioncause){
//connection異常
if
(cause.isHardError()){
System.out.println("connection異常:["
+cause.getMessage()+"]");
//程序引起的異常,如:connection.close()
if
(cause.isInitiatedByApplication()){
System.out.println("connection關(guān)閉異常,重連...begin...");//
connection=factory.newConnection();
System.out.println("connection關(guān)閉異常,重連...end...");
}else{//rabbitmq服務(wù)引起的異常
AMQCommandamqCommand=(AMQCommand)cause.getReason();
if(amqCommand.getMethod()instanceof
AMQImpl.Connection.Close){
AMQImpl.Connection.Closeclose=(AMQImpl.Connection.Close)amqCommand.getMethod();
if(320==close.getReplyCode()){//rabbitmq服務(wù)器強(qiáng)制關(guān)閉
System.out.println("connection關(guān)閉異常,請檢查rabbitmq服務(wù)器是否正常啟動(dòng)!");
}}}}}});Productor范例代碼及注意事項(xiàng) factory.set18Productor范例代碼及注意事項(xiàng)
//監(jiān)聽connection阻塞異常
connection.addBlockedListener(new
BlockedListener(){
@Override
public
void
handleUnblocked()throws
IOException{
System.out.println("connection已解除阻塞!");
}
@Override
public
void
handleBlocked(Stringreason)throws
IOException{
System.out.println("connection阻塞原因:["+reason+"],請檢查內(nèi)存是否夠!");
}
});
//注意:channel應(yīng)為單例,不要每次取消息新建一次對象
Channelchannel=connection.createChannel();
//監(jiān)聽channel關(guān)閉異常
channel.addShutdownListener(new
ShutdownListener(){
@Override
public
void
shutdownCompleted(ShutdownSignalExceptioncause){
//channel異常
if
(!cause.isHardError()){System.out.println("channel異常:["
+cause.getMessage()+"]");
}
}
});
Productor范例代碼及注意事項(xiàng)19Productor范例代碼及注意事項(xiàng)
/** *創(chuàng)建交換機(jī) *exchange 交換機(jī)名 *type
交換機(jī)類型 fanout:廣播模式,所有消費(fèi)者都能收到生產(chǎn)者發(fā)送的消息,速度更快,不需設(shè)置routingkey * direct:只有與routingkey配置的消費(fèi)者才能收到消息 * topic:與direct相同,只是支持模糊配置,類似正則表達(dá)式,功能更強(qiáng), **表示通配一個(gè)詞,#表示通配0個(gè)或多個(gè)詞(注意:是詞,不是字母) * *durable
durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失; * durable=false,相反 * */
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
//模擬發(fā)送消息
for
(int
i=0;i<100;i++){
Stringmessage="HelloWorld!";
message+=i;
/***exchange 交換機(jī)名,""為默認(rèn)交換機(jī),direct類型
*routingKey
exchange為direct、topic類型時(shí)指定routingKey,exchange為fanout類型時(shí)指定queueName隊(duì)列名 *props
MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化,rabbitmq服務(wù)重啟消息不會(huì)丟失;null:非持久化 *body 發(fā)送消息 */
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("[x]Sent'"
+message+"'");
}
//關(guān)閉連接//
channel.close();//
connection.close();
}}Productor范例代碼及注意事項(xiàng) /**20Productor范例代碼及注意事項(xiàng)ConnectionFactory、Connection、Channel注意單例控制默認(rèn)心跳時(shí)間為10分鐘,發(fā)現(xiàn)個(gè)人賬戶環(huán)境防火墻為5分鐘,有可能被防火墻殺死,建議設(shè)置4分鐘connectionFactory.setRequestedHeartbeat(4*60);添加Shutdown、Blocked異常監(jiān)聽,Shutdown后重連機(jī)制、Blocked后的日志輸出Productor范例代碼及注意事項(xiàng)ConnectionFa21Consumer范例代碼及注意事項(xiàng)//以exchange為direct為例package
com.rabbitmq.test.ow.demo2;import
java.io.IOException;import
java.util.HashMap;import
java.util.Map;import
com.rabbitmq.client.BlockedListener;import
com.rabbitmq.client.Channel;import
com.rabbitmq.client.Connection;import
com.rabbitmq.client.ConnectionFactory;import
com.rabbitmq.client.QueueingConsumer;import
com.rabbitmq.client.ShutdownListener;import
com.rabbitmq.client.ShutdownSignalException;import
com.rabbitmq.client.impl.AMQCommand;import
com.rabbitmq.client.impl.AMQImpl;/***
Title:
Consumer.java*
Description:【消費(fèi)者樣例】*
@author:
zengqiang.yang*
@date:
2013-12-25**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司
*/Consumer范例代碼及注意事項(xiàng)//以exchange為d22Consumer范例代碼及注意事項(xiàng)public
class
Consumer{
//交換機(jī)命名規(guī)范:e_模塊_其他
private
static
final
StringEXCHANGE_NAME
="e_tyb_test";
//隊(duì)列命名規(guī)范:q_模塊_其他
private
static
final
StringQUEUE_NAME
="q_tyb_test";
//routingkey命名規(guī)范:r_模塊_其他
private
static
final
StringROUTING_KEY
="r_tyb_test";
public
static
void
main(String[]argv)throws
Exception{
//注意:factory應(yīng)為單例,不要每次取消息新建一次對象
ConnectionFactoryfactory=new
ConnectionFactory();
factory.setHost("90");
factory.setPort(5672);//默認(rèn)端口
factory.setUsername("guest");//默認(rèn)用戶名
factory.setPassword("guest");//默認(rèn)密碼
factory.setVirtualHost("/");//默認(rèn)虛擬主機(jī),區(qū)分權(quán)限
//設(shè)置心跳時(shí)間,防止長時(shí)間未活動(dòng)被防火墻殺死,默認(rèn)600秒,單位:秒//
factory.setRequestedHeartbeat(60*4);Consumer范例代碼及注意事項(xiàng)publicclass23Consumer范例代碼及注意事項(xiàng)
//連接超時(shí)時(shí)間,單位:毫秒//
factory.setConnectionTimeout(1000*2);
//注意:connection應(yīng)為單例,不要每次取消息新建一次對象
Connectionconnection=factory.newConnection();
//監(jiān)聽connection關(guān)閉異常
connection.addShutdownListener(new
ShutdownListener(){
@Override
public
void
shutdownCompleted(ShutdownSignalExceptioncause){
//connection異常
if
(cause.isHardError()){
System.out.println("connection異常:["
+cause.getMessage()+"]");
//程序引起的異常,如:connection.close()
if
(cause.isInitiatedByApplication()){
System.out.println("connection關(guān)閉異常,重連...begin...");//
connection=factory.newConnection();
System.out.println("connection關(guān)閉異常,重連...end...");
}
Consumer范例代碼及注意事項(xiàng) //連接超時(shí)時(shí)間,單位:24Consumer范例代碼及注意事項(xiàng)else{//rabbitmq服務(wù)引起的異常
AMQCommandamqCommand=(AMQCommand)cause.getReason();
if(amqCommand.getMethod()instanceof
AMQImpl.Connection.Close){
AMQImpl.Connection.Closeclose=(AMQImpl.Connection.Close)amqCommand.getMethod();
if(320==close.getReplyCode()){//rabbitmq服務(wù)器強(qiáng)制關(guān)閉
System.out.println("connection關(guān)閉異常,請檢查rabbitmq服務(wù)器是否正常啟動(dòng)!");
}
}
}
}
}
});
//監(jiān)聽connection阻塞異常
connection.addBlockedListener(new
BlockedListener(){
@Override
public
void
handleUnblocked()throws
IOException{
System.out.println("connection已解除阻塞!");
}
@Override
public
void
handleBlocked(Stringreason)throws
IOException{
System.out.println("connection阻塞原因:["+reason+"],請檢查內(nèi)存是否夠!");
}
});Consumer范例代碼及注意事項(xiàng)else{//rabbit25Consumer范例代碼及注意事項(xiàng)
//注意:channel應(yīng)為單例,不要每次取消息新建一次對象
Channelchannel=connection.createChannel();
//監(jiān)聽channel關(guān)閉異常
channel.addShutdownListener(new
ShutdownListener(){
@Override
public
void
shutdownCompleted(ShutdownSignalExceptioncause){
//channel異常
if
(!cause.isHardError()){
System.out.println("channel異常:["
+cause.getMessage()+"]");
}
}
});
//同一時(shí)間一個(gè)消費(fèi)者只能接收一條消息,web管理界面隊(duì)列Unacked數(shù)值,如多個(gè)隊(duì)列數(shù)值累加
channel.basicQos(1);
Consumer范例代碼及注意事項(xiàng)26Consumer范例代碼及注意事項(xiàng)/***創(chuàng)建交換機(jī)*exchange 交換機(jī)名
*type 交換機(jī)類型 fanout:廣播模式,所有消費(fèi)者都能收到生產(chǎn)者發(fā)送的消息,速度更快,不需設(shè)置routingkey* direct:只有與routingkey配置的消費(fèi)者才能收到消息* topic:與direct相同,只是支持模糊配置,類似正則表達(dá)式,功能更強(qiáng),**表示通配一個(gè)詞,#表示通配0個(gè)或多個(gè)詞(注意:是詞,不是字母)* *durable
durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;* durable=false,相反**/
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
/***創(chuàng)建隊(duì)列*queue 隊(duì)列名*durable 持久化隊(duì)列,true:服務(wù)器重啟隊(duì)列不會(huì)丟失;false:反之*exclusive 排他性,true:首次申明的connection連接下可見;* false:所有connection連接下都可見*connection關(guān)閉后隊(duì)列自動(dòng)刪除,忽略隊(duì)列持久化*autoDelete
true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除*arguments 可指定隊(duì)列里消息總數(shù)*/
Map<String,Object>args
=new
HashMap<String,Object>();//
args.put("x-max-length",100);
//設(shè)置隊(duì)列里的最大消息數(shù)//
args.put("x-message-ttl",1000*10);
//設(shè)置消息過期時(shí)間,時(shí)間一過消息自動(dòng)刪除,單位:毫秒//
channel.queueDeclare(QUEUE_NAME,true,false,false,args);
channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer范例代碼及注意事項(xiàng)/**27Consumer范例代碼及注意事項(xiàng)
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
System.out.println("[*]Waitingformessages.ToexitpressCTRL+C");
//聲明一個(gè)消費(fèi)者
QueueingConsumerconsumer=new
QueueingConsumer(channel);
/** *queue 隊(duì)列名 *autoAck
true:消息從隊(duì)列刪除,不管是否正確處理;false:消息不從隊(duì)列刪除,需要ack響應(yīng) *callback 消費(fèi)者 */
channel.basicConsume(QUEUE_NAME,false,consumer);
while
(true){
//循環(huán)獲取消息
QueueingConsumer.Deliverydelivery=consumer.nextDelivery();
Stringmessage=new
String(delivery.getBody());
System.out.println("[x]Received'"
+message+"'");
doWork(message);
System.out.println("[x]Done");
//確認(rèn),消息從隊(duì)列中刪除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//拒絕,消息不從隊(duì)列刪除//
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
}
}
private
static
void
doWork(Stringtask)throws
InterruptedException{
for
(char
ch:task.toCharArray()){
if
(ch==''){
Thread.sleep(1000);
}
}
}}Consumer范例代碼及注意事項(xiàng)28Consumer范例代碼及注意事項(xiàng)ConnectionFactory、Connection、Channel注意單例控制默認(rèn)心跳時(shí)間為10分鐘,發(fā)現(xiàn)個(gè)人賬戶環(huán)境防火墻為5分鐘,有可能被防火墻殺死,建議設(shè)置4分鐘connectionFactory.setRequestedHeartbeat(4*60);添加Shutdown、Blocked異常監(jiān)聽,Shutdown后重連機(jī)制、Blocked后的日志輸出對于比較重要的消息,消費(fèi)者啟動(dòng)應(yīng)在生產(chǎn)者之前,避免生產(chǎn)者發(fā)送消息時(shí)消費(fèi)者未啟動(dòng)消息丟失Consumer范例代碼及注意事項(xiàng)ConnectionFac29開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理ConnectionFactory、Connection、Channel切勿每發(fā)送一條消息創(chuàng)建一次,因用單例控制Connection默認(rèn)心跳 //默認(rèn)600秒(10分鐘)connectionFactory.setRequestedHeartbeat(240);Exchange持久化 //durable=true持久化,durable=false非持久化 channel.exchangeDeclare(EXCHANGE_NAME,“direct”,true);開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理ConnectionFact30開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue持久化 //durable=true持久化,durable=false非持久化 channel.queueDeclare(QUEUE_NAME,true,false,false,null);Message持久化 //props=MessageProperties.PERSISTENT_TEXT_PLAIN消息持久化 channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue持久化31開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列排他性 //exclusive=true首次申明的connection連接下可見,
//exclusive=false所有connection連接下都可見 channel.queueDeclare(QUEUE_NAME,true,true,false,null);Queue隊(duì)列里消息數(shù)大小設(shè)置 Map<String,Object>args=newHashMap<String,Object>(); args.put("x-max-length",50);//設(shè)置隊(duì)列里的最大消息數(shù) channel.queueDeclare(QUEUE_NAME,true,false,false,args);開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列排他性32開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列里消息有效性 Map<String,Object>args=newHashMap<String,Object>(); //設(shè)置消息過期時(shí)間,時(shí)間一過消息自動(dòng)刪除,單位:毫秒 args.put("x-message-ttl",1000*10); channel.queueDeclare(QUEUE_NAME,true,false,false,args);開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列里消息有效性33開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理關(guān)閉異常ShutdownSignalException通過Connection.addShutdownListener監(jiān)聽,可分為程序引起(如:connection.close())和服務(wù)器引起兩大異常,程序引起建議捕獲異常后有重連機(jī)制阻塞異常通過connection.addBlockedListener監(jiān)聽,當(dāng)使用內(nèi)存接近最大內(nèi)存時(shí),消息會(huì)阻塞(handleBlocked方法
捕獲),可通過調(diào)整內(nèi)存大小解除阻塞(handleUnblocked方法捕獲)開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理關(guān)閉異常ShutdownSi34RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加用戶并指定密碼./rabbitmqctladd_user用戶名
密碼設(shè)置用戶權(quán)限./rabbitmqctlset_user_tags用戶名
權(quán)限組權(quán)限組4類Management PolicymakerMonitoringAdministratorRabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加用戶并指定密碼35Management查看rabbitmq服務(wù)總消息數(shù)、當(dāng)前與多少機(jī)器相連(connection)、建了多少通道(channel)、建立了多少隊(duì)列數(shù)等Management36Policymaker比management多了一個(gè)Admin選項(xiàng)卡中的policies標(biāo)簽,可添加鏡像隊(duì)列,滿足集群情況下隊(duì)列的同步Policymaker37Monitoring比management多了Overview選項(xiàng)卡中的Nodes、Portsandcontexts標(biāo)簽,可查看當(dāng)前打開socket連接數(shù)、rabbitmq內(nèi)存、硬盤使用情況;amqp端口(默認(rèn)5672)及Webmanagement管理界面端口(默認(rèn)15672,55672是rabbitmq3.0之前版本端口號)Monitoring38Administrator最高權(quán)限,用戶、虛擬主機(jī)、策略等都可設(shè)置Administrator39RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加vhost./rabbitmqctladd_vhostvhost名設(shè)置vhost權(quán)限./rabbitmqctlset_permissions-pvhost名
用戶名".*"".*"".*“Vhost下的所有資源都有讀寫權(quán)限RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加vhost40RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加鏡像隊(duì)列策略./rabbitmqctl-n節(jié)點(diǎn)名set_policy-pvhost名
策略名"^q\_tyb\_"'{"ha-mode":"all","ha-sync-mode":"automatic"}‘^q\_tyb\_設(shè)置隊(duì)列名規(guī)則ha-mode:all所有機(jī)器都接收(集群環(huán)境下)ha-sync-mode:automatic某一臺機(jī)器停止重啟后,自動(dòng)同步隊(duì)列里的消息RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加鏡像隊(duì)列策略41RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)最大打開文件數(shù)及socket連接數(shù)如ConnectionFactory、Connection、Channel未控制單例,每發(fā)送一條消息就創(chuàng)建一次連接,file、socket數(shù)也隨之增加,file與socket總數(shù)比例大約80%RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)最大打開文件數(shù)及sock42RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)rabbitmq內(nèi)存、硬盤使用空間rabbitmq最大內(nèi)存默認(rèn)為機(jī)器內(nèi)存的40%rabbitmq磁盤空間默認(rèn)為rabbitmq數(shù)據(jù)所在分區(qū)的可用空閑空間RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)rabbitmq內(nèi)存、硬43RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Exchange交換機(jī)的持久化交換機(jī)持久化,rabbitmq服務(wù)重啟exchange依然存在,數(shù)據(jù)保存在硬盤上RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Exchange交換機(jī)的44RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Queue隊(duì)列的持久化隊(duì)列持久化,rabbitmq服務(wù)重啟queue依然存在,數(shù)據(jù)保存在硬盤上
RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Queue隊(duì)列的持久化45RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列最大消息數(shù)(可選)當(dāng)隊(duì)列里消息數(shù)到達(dá)最大值時(shí),后面消息會(huì)替換原先的消息,隊(duì)列里消息是先進(jìn)先出原則代碼Map<String,Object>args=newHashMap<String,Object>(); args.put("x-max-length",50);//設(shè)置隊(duì)列里的最大消息數(shù) channel.queueDeclare(QUEUE_NAME,true,false,false,args);RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列最大消息數(shù)(可選)46示意圖示意圖47RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)消息有效性(可選)消息進(jìn)入隊(duì)列過了指定時(shí)間(單位:毫秒),自動(dòng)刪除消息代碼 Map<String,Object>args=newHashMap<String,Object>(); //設(shè)置消息過期時(shí)間,時(shí)間一過消息自動(dòng)刪除,單位:毫秒 args.put("x-message-ttl",1000*10); channel.queueDeclare(QUEUE_NAME,true,false,false,args);RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)消息有效性(可選)48示意圖示意圖49RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)多機(jī)集群隊(duì)列同步如果一臺RabbitMQ服務(wù)掛了,其他機(jī)器上會(huì)有一份副本。RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)多機(jī)集群隊(duì)列同步50RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列命名規(guī)范,如:q_模塊_其他影響隊(duì)列同步,多機(jī)集群間隊(duì)列的同步需要使用policy策略來保證RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列命名規(guī)范,如:q_模51RabbitMQ與Spring整合范例代碼生產(chǎn)者applicationContext-rabbitmq-producer.xml配置文件<?xml
version="1.0"
encoding="UTF-8"?><beans
xmlns=""
xmlns:xsi=""xmlns:context=""
xmlns:rabbit=""
xsi:schemaLocation="
">
<!--連接服務(wù)配置-->
<rabbit:connection-factory
id="connectionFactory"
host="90"
username="guest"
password="guest"
port="5672"
/>
<rabbit:admin
connection-factory="connectionFactory"
/>
RabbitMQ與Spring整合范例代碼生產(chǎn)者applic52RabbitMQ與Spring整合范例代碼<!--queue隊(duì)列聲明-->
<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->
<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->
<!--排他性,exclusive=true:首次申明的connection連接下可見;exclusive=false:所有connection連接下都可見-->
<rabbit:queue
id="q_tyb_test2"
durable="true"
auto-delete="false"
exclusive="false"
name="q_tyb_test2"
/>
<!--exchangequeuebingingkey綁定-->
<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->
<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->
<rabbit:direct-exchangename="e_tyb_test2"
durable="true"
auto-delete="false"
id="e_tyb_test2">
<rabbit:bindings>
<rabbit:binding
queue="q_tyb_test2"
key="r_tyb_test2"
/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:template
id="amqpTemplate"
exchange="e_tyb_test2"
connection-factory="connectionFactory"
/></beans>RabbitMQ與Spring整合范例代碼53RabbitMQ與Spring整合范例代碼//生產(chǎn)者packagecom.rabbitmq.test.ow.spring;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.context.ApplicationContext;importorg.springframework.context.support.;/***
Title:
Producer.java*
Description:【rabbitmq與spring整合-生產(chǎn)者】*
@author:
zengqiang.yang*
@date:
2013-12-30**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司
*/public
classProducer{public
static
voidmain(String[]args){ApplicationContextcontext=new
("src/main/resources/applicationContext-rabbitmq-producer.xml");RabbitMQ與Spring整合范例代碼//生產(chǎn)者54RabbitMQ與Spring整合范例代碼AmqpTemplateamqpTemplate=(AmqpTemplate)context.getBean("amqpTemplate");//模擬發(fā)送消息for(inti=0;i<100;i++){Stringmessage="HelloWorld!";message+=i;/***r_tyb_test2routingkey*message發(fā)送消息*/amqpTemplate.convertAndSend("r_tyb_test2",message);System.out.println("Sent:"+message);}}}RabbitMQ與Spring整合范例代碼55RabbitMQ與Spring整合范例代碼消費(fèi)者applicationContext-rabbitmq-consumer.xml配置文件<?xml
version="1.0"
encoding="UTF-8"?><beans
xmlns=""
xmlns:xsi=""
xmlns:context=""
xmlns:rabbit=""
xsi:schemaLocation="
">
<!--連接服務(wù)配置-->
<rabbit:connection-factory
id="connectionFactory"
host="90"
username="guest"
password="guest"
port="5672"
/>
<rabbit:admin
connection-factory="connectionFactory"
/>
<!--queue隊(duì)列聲明-->
<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->
<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->
<!--排他性,exclusive=true:首次申明的connection連接下可見;exclusive=false:所有connection連接下都可見-->RabbitMQ與Spring整合范例代碼消費(fèi)者applic56RabbitMQ與Spring整合范例代碼
<rabbit:queue
id="q_tyb_test2"
durable="true"
auto-delete="false"
exclusive="false"
name="q_tyb_test2"
/>
<!--exchangequeuebinging
key綁定-->
<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->
<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->
<rabbit:direct-exchange
name="e_tyb_test2"
durable="true"
auto-delete="false"
id="e_tyb_test2">
<rabbit:bindings>
<rabbit:binding
queue="q_tyb_test2"
key="r_tyb_test2"
/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義消費(fèi)者監(jiān)聽-->
<bean
id="consumerLitener"
class="com.rabbitmq.test.ow.spring.ConsumerLitener"></bean>
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="auto">
<rabbit:listener
queues="q_tyb_test2"
ref="consumerLitener"
/>
</rabbit:listener-container></beans>RabbitMQ與Spring整合范例代碼 <rabbit:57RabbitMQ與Spring整合范例代碼//消費(fèi)者packagecom.rabbitmq.test.ow.spring;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageListener;importorg.springframework.context.ApplicationContext;importorg.springframework.context.support.;/***
Title:
ConsumerLitener.java*
Description:【rabbitmq與spring整合-消費(fèi)者】*
@author:
zengqiang.yang*
@date:
2013-12-30**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司
*/public
classConsumerLitenerimplementsMessageListener{@Overridepublic
voidonMessage(Messagearg0){RabbitMQ與Spring整合范例代碼//消費(fèi)者58RabbitMQ與Spring整合范例代碼Stringmessage=newString(arg0.getBody());try{Thread.sleep(1000);System.out.println("Recv:"+message);}catch(InterruptedExceptione){e.printStackTrace();}}public
static
voidmain(String[]args){
//啟動(dòng)容器ApplicationContextcontext=new
("src/main/resources/applicationContext-rabbitmq-consumer.xml");}}RabbitMQ與Spring整合范例代碼59經(jīng)常不斷地學(xué)習(xí),你就什么都知道。你知道得越多,你就越有力量StudyConstantly,AndYouWillKnowEverything.TheMoreYouKnow,TheMorePowerfulYouWillBe寫在最后經(jīng)常不斷地學(xué)習(xí),你就什么都知道。你知道得越多,你就越有力量寫60謝謝你的到來學(xué)習(xí)并沒有結(jié)束,希望大家繼續(xù)努力LearningIsNotOver.IHopeYouWillContinueToWorkHard演講人:XXXXXX時(shí)間:XX年XX月XX日
謝謝你的到來演講人:XXXXXX61RabbitMQ介紹
RabbitMQ介紹
62目錄什么是MQMQ有什么優(yōu)勢哪些情況下建議使用MQ什么是RabbitMQ選擇RabbitMQ理由RabbitMQ服務(wù)場景RabbitMQ結(jié)構(gòu)圖RabbitMQ名詞解釋目錄什么是MQ63目錄RabbitMQ客戶端使用流程(productor/cunsumer)Productor范例代碼及注意事項(xiàng)Consumer范例代碼及注意事項(xiàng)開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)RabbitMQ與Spring整合范例代碼目錄RabbitMQ客戶端使用流程(productor/cu64什么是MQ?MQ全稱為MessageQueue,消息隊(duì)列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法(消息傳遞),一般用作進(jìn)程間通訊MQ有什么優(yōu)勢?MQ本身是異步的,往隊(duì)列里發(fā)送消息后無需等待,不同于通信協(xié)議。如HTTP協(xié)議(同步),客戶端發(fā)出請求后必須等待服務(wù)器回應(yīng)哪些情況下建議使用MQ高并發(fā)應(yīng)用來不及處理,實(shí)時(shí)性要求不高多應(yīng)用之間異步通信,且耗時(shí)操作什么是MQ?65什么是RabbitMQRabbitMQ是由Erlang(愛立信公司)語言開發(fā),實(shí)現(xiàn)AdvancedMessageQueuingProtocol(AMQP高級消息隊(duì)列協(xié)議)的消息中間件。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。什么是RabbitMQRabbitMQ是由Erlang(愛立66選擇RabbitMQ理由Reliability可靠性Exchange交換機(jī)、Queue隊(duì)列、Message消息持久化、高可用性FlexibleRouting靈活路由Clustering集群分為Disc(硬盤)與RAM(內(nèi)存),保證至少一臺DiscHighlyAvailableQueues高可用隊(duì)列與集群結(jié)合使用,設(shè)置隊(duì)列間的消息同步ManagementUI管理界面選擇RabbitMQ理由Reliability可靠性67異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電/隊(duì)列爆滿)單機(jī)丟失RabbitMQ支持集群,多臺機(jī)器隊(duì)列同步,丟失消息可從其他機(jī)器上獲取網(wǎng)絡(luò)丟失掉電RabbitMQ支持持久化,數(shù)據(jù)保存在硬盤上隊(duì)列爆滿RabbitMQ支持流控機(jī)制,可修改內(nèi)存大小,默認(rèn)為機(jī)器內(nèi)存的40%異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電68RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(異步)WorkQueues(消息均勻分配消息給消費(fèi)者)Publish/Subscribe(廣播模式,消息分發(fā)給所有的消費(fèi)者)Routing(消費(fèi)者接收消息由路由規(guī)則決定,簡單路由名)Topics(消費(fèi)者接收消息由路由規(guī)則決定,路由規(guī)則名比較復(fù)雜)RPC遠(yuǎn)程調(diào)用(同步)RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(69RabbitMQ結(jié)構(gòu)圖RabbitMQ結(jié)構(gòu)圖70RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例如RabbitMQ服務(wù)Vhost:虛擬主機(jī),默認(rèn)為“/”,一個(gè)broker里可以有多個(gè)vhost,區(qū)分不同用戶權(quán)限,類似java的命令空間Connection:應(yīng)用程序與broker連接,可有多個(gè)連接Channel:消息通道,connection中可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù),所有操作都在channel中進(jìn)行。RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例71RabbitMQ名詞解釋Exchange:消息交換機(jī),channel中可有多個(gè),用于投遞消息。應(yīng)用程序發(fā)送消息時(shí)先把消息給交換機(jī),由交換機(jī)投遞給隊(duì)列,不是直接給隊(duì)列Queue:隊(duì)列,用于存放消息Message:消息,應(yīng)用程序需要發(fā)送的數(shù)據(jù)Bind:根據(jù)routingKey綁定exchange與queue規(guī)則,決定消息發(fā)送的方向RabbitMQ名詞解釋Exchange:消息交換機(jī),cha72RabbitMQ對象間關(guān)系broker可多個(gè)Connection可多個(gè)Channel可多個(gè)Exchange可多個(gè)QueuemessageRabbitMQ對象間關(guān)系broker可多個(gè)可多個(gè)可多個(gè)可多73Exchange主要3種類型Fanout:不處理路由鍵(沒有routingKey),只需把隊(duì)列綁定到交換機(jī)上。發(fā)送到交換機(jī)的消
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 土地驗(yàn)收檢查方案(3篇)
- 教研活動(dòng)競標(biāo)方案(3篇)
- 商鋪拆遷定價(jià)方案(3篇)
- 球館裝修規(guī)劃方案(3篇)
- 協(xié)商經(jīng)費(fèi)分?jǐn)偡桨?3篇)
- 故宮木器修繕方案(3篇)
- DB23-T3036-2021-篤斯越桔野生資源調(diào)查技術(shù)規(guī)程-黑龍江省
- 鄉(xiāng)村小院低價(jià)改造方案(3篇)
- DB23-T2890-2021-元蘑林下栽培技術(shù)規(guī)程-黑龍江省
- DB23-T2879-2021-藍(lán)靛果與中藥材赤芍復(fù)合種植技術(shù)規(guī)程-黑龍江省
- 二級計(jì)量師考試歷年真題題庫和答案2024
- 23G409先張法預(yù)應(yīng)力混凝土管樁
- 2022年高考真題-政治(重慶卷) 含答案
- 探索心理學(xué)的奧秘智慧樹知到期末考試答案章節(jié)答案2024年北京大學(xué)
- 鐵工電〔2023〕54號國鐵集團(tuán)關(guān)于印發(fā)《普速鐵路工務(wù)安全規(guī)則》的通知
- 數(shù)據(jù)中心機(jī)房工程施工組織方案
- 績效管理全套ppt課件(完整版)
- 酒店二次供水應(yīng)急預(yù)案
- 土工布檢測報(bào)告土工布產(chǎn)品屬性
- 導(dǎo)流明渠混凝土施工方案
- JJF 1175-2021 試驗(yàn)篩校準(zhǔn)規(guī)范_(高清-最新版)
評論
0/150
提交評論