rabbitmq介紹122教學(xué)講解課件_第1頁
rabbitmq介紹122教學(xué)講解課件_第2頁
rabbitmq介紹122教學(xué)講解課件_第3頁
rabbitmq介紹122教學(xué)講解課件_第4頁
rabbitmq介紹122教學(xué)講解課件_第5頁
已閱讀5頁,還剩117頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

RabbitMQ介紹

RabbitMQ介紹

1目錄什么是MQMQ有什么優(yōu)勢哪些情況下建議使用MQ什么是RabbitMQ選擇RabbitMQ理由RabbitMQ服務(wù)場景RabbitMQ結(jié)構(gòu)圖RabbitMQ名詞解釋目錄什么是MQ2目錄RabbitMQ客戶端使用流程(productor/cunsumer)Productor范例代碼及注意事項(xiàng)Consumer范例代碼及注意事項(xiàng)開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)RabbitMQ與Spring整合范例代碼目錄RabbitMQ客戶端使用流程(productor/cu3什么是MQ?MQ全稱為MessageQueue,消息隊(duì)列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法(消息傳遞),一般用作進(jìn)程間通訊MQ有什么優(yōu)勢?MQ本身是異步的,往隊(duì)列里發(fā)送消息后無需等待,不同于通信協(xié)議。如HTTP協(xié)議(同步),客戶端發(fā)出請求后必須等待服務(wù)器回應(yīng)哪些情況下建議使用MQ高并發(fā)應(yīng)用來不及處理,實(shí)時(shí)性要求不高多應(yīng)用之間異步通信,且耗時(shí)操作什么是MQ?4什么是RabbitMQRabbitMQ是由Erlang(愛立信公司)語言開發(fā),實(shí)現(xiàn)AdvancedMessageQueuingProtocol(AMQP高級消息隊(duì)列協(xié)議)的消息中間件。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。什么是RabbitMQRabbitMQ是由Erlang(愛立5選擇RabbitMQ理由Reliability可靠性Exchange交換機(jī)、Queue隊(duì)列、Message消息持久化、高可用性FlexibleRouting靈活路由Clustering集群分為Disc(硬盤)與RAM(內(nèi)存),保證至少一臺DiscHighlyAvailableQueues高可用隊(duì)列與集群結(jié)合使用,設(shè)置隊(duì)列間的消息同步ManagementUI管理界面選擇RabbitMQ理由Reliability可靠性6異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電/隊(duì)列爆滿)單機(jī)丟失RabbitMQ支持集群,多臺機(jī)器隊(duì)列同步,丟失消息可從其他機(jī)器上獲取網(wǎng)絡(luò)丟失掉電RabbitMQ支持持久化,數(shù)據(jù)保存在硬盤上隊(duì)列爆滿RabbitMQ支持流控機(jī)制,可修改內(nèi)存大小,默認(rèn)為機(jī)器內(nèi)存的40%異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電7RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(異步)WorkQueues(消息均勻分配消息給消費(fèi)者)Publish/Subscribe(廣播模式,消息分發(fā)給所有的消費(fèi)者)Routing(消費(fèi)者接收消息由路由規(guī)則決定,簡單路由名)Topics(消費(fèi)者接收消息由路由規(guī)則決定,路由規(guī)則名比較復(fù)雜)RPC遠(yuǎn)程調(diào)用(同步)RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(8RabbitMQ結(jié)構(gòu)圖RabbitMQ結(jié)構(gòu)圖9RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例如RabbitMQ服務(wù)Vhost:虛擬主機(jī),默認(rèn)為“/”,一個(gè)broker里可以有多個(gè)vhost,區(qū)分不同用戶權(quán)限,類似java的命令空間Connection:應(yīng)用程序與broker連接,可有多個(gè)連接Channel:消息通道,connection中可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù),所有操作都在channel中進(jìn)行。RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例10RabbitMQ名詞解釋Exchange:消息交換機(jī),channel中可有多個(gè),用于投遞消息。應(yīng)用程序發(fā)送消息時(shí)先把消息給交換機(jī),由交換機(jī)投遞給隊(duì)列,不是直接給隊(duì)列Queue:隊(duì)列,用于存放消息Message:消息,應(yīng)用程序需要發(fā)送的數(shù)據(jù)Bind:根據(jù)routingKey綁定exchange與queue規(guī)則,決定消息發(fā)送的方向RabbitMQ名詞解釋Exchange:消息交換機(jī),cha11RabbitMQ對象間關(guān)系broker可多個(gè)Connection可多個(gè)Channel可多個(gè)Exchange可多個(gè)QueuemessageRabbitMQ對象間關(guān)系broker可多個(gè)可多個(gè)可多個(gè)可多12Exchange主要3種類型Fanout:不處理路由鍵(沒有routingKey),只需把隊(duì)列綁定到交換機(jī)上。發(fā)送到交換機(jī)的消息都會(huì)轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上,類似于廣播,轉(zhuǎn)發(fā)消息是最快的Exchange主要3種類型Fanout:不處理路由鍵(沒有13Exchange主要3種類型Direct:處理路由鍵(有routingKey)。將一隊(duì)列綁定到交換機(jī)上,該消息需與一個(gè)特定的路由鍵(routingKey)完全匹配Exchange主要3種類型Direct:處理路由鍵(有ro14Exchange主要3種類型Topic:與direct類似,功能更強(qiáng),支持模糊綁定*表示通配一個(gè)詞#表示通配0個(gè)或多個(gè)詞Exchange主要3種類型Topic:與direct類似,15RabbitMQ客戶端使用流程(productor/cunsumer)RabbitMQ客戶端使用流程(productor/cuns16Productor范例代碼及注意事項(xiàng)//以exchange為direct為例package

com.rabbitmq.test.ow.demo2;import

java.io.IOException;import

com.rabbitmq.client.BlockedListener;import

com.rabbitmq.client.Channel;import

com.rabbitmq.client.Connection;import

com.rabbitmq.client.ConnectionFactory;import

com.rabbitmq.client.MessageProperties;import

com.rabbitmq.client.ShutdownListener;import

com.rabbitmq.client.ShutdownSignalException;import

com.rabbitmq.client.impl.AMQCommand;import

com.rabbitmq.client.impl.AMQImpl;/***

Title:

Producer.java*

Description:【生產(chǎn)者樣例】*

@author:

zengqiang.yang*

@date:

2013-12-25**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司

*/public

class

Producer{

//交換機(jī)命名規(guī)范:e_模塊_其他

private

static

final

StringEXCHANGE_NAME

="e_tyb_test";

//routingkey命名規(guī)范:r_模塊_其他

private

static

final

StringROUTING_KEY

="r_tyb_test";

public

static

void

main(String[]argv)throws

Exception{

//注意:factory應(yīng)為單例,不要每次取消息新建一次對象

ConnectionFactoryfactory=new

ConnectionFactory();

factory.setHost("90");

factory.setPort(5672);//默認(rèn)端口Productor范例代碼及注意事項(xiàng)//以exchange為17Productor范例代碼及注意事項(xiàng)

factory.setUsername("guest");//默認(rèn)用戶名

factory.setPassword("guest");//默認(rèn)密碼

factory.setVirtualHost("/");//默認(rèn)虛擬主機(jī),區(qū)分權(quán)限

//設(shè)置心跳時(shí)間,防止長時(shí)間未活動(dòng)被防火墻殺死,默認(rèn)600秒,單位:秒//

factory.setRequestedHeartbeat(60*4);

//連接超時(shí)時(shí)間,單位:毫秒//

factory.setConnectionTimeout(1000*2);

//注意:connection應(yīng)為單例,不要每次取消息新建一次對象

Connectionconnection=factory.newConnection();

//監(jiān)聽connection關(guān)閉異常

connection.addShutdownListener(new

ShutdownListener(){ @Override

public

void

shutdownCompleted(ShutdownSignalExceptioncause){

//connection異常

if

(cause.isHardError()){

System.out.println("connection異常:["

+cause.getMessage()+"]");

//程序引起的異常,如:connection.close()

if

(cause.isInitiatedByApplication()){

System.out.println("connection關(guān)閉異常,重連...begin...");//

connection=factory.newConnection();

System.out.println("connection關(guān)閉異常,重連...end...");

}else{//rabbitmq服務(wù)引起的異常

AMQCommandamqCommand=(AMQCommand)cause.getReason();

if(amqCommand.getMethod()instanceof

AMQImpl.Connection.Close){

AMQImpl.Connection.Closeclose=(AMQImpl.Connection.Close)amqCommand.getMethod();

if(320==close.getReplyCode()){//rabbitmq服務(wù)器強(qiáng)制關(guān)閉

System.out.println("connection關(guān)閉異常,請檢查rabbitmq服務(wù)器是否正常啟動(dòng)!");

}}}}}});Productor范例代碼及注意事項(xiàng) factory.set18Productor范例代碼及注意事項(xiàng)

//監(jiān)聽connection阻塞異常

connection.addBlockedListener(new

BlockedListener(){

@Override

public

void

handleUnblocked()throws

IOException{

System.out.println("connection已解除阻塞!");

}

@Override

public

void

handleBlocked(Stringreason)throws

IOException{

System.out.println("connection阻塞原因:["+reason+"],請檢查內(nèi)存是否夠!");

}

});

//注意:channel應(yīng)為單例,不要每次取消息新建一次對象

Channelchannel=connection.createChannel();

//監(jiān)聽channel關(guān)閉異常

channel.addShutdownListener(new

ShutdownListener(){

@Override

public

void

shutdownCompleted(ShutdownSignalExceptioncause){

//channel異常

if

(!cause.isHardError()){System.out.println("channel異常:["

+cause.getMessage()+"]");

}

}

});

Productor范例代碼及注意事項(xiàng)19Productor范例代碼及注意事項(xiàng)

/** *創(chuàng)建交換機(jī) *exchange 交換機(jī)名 *type

交換機(jī)類型 fanout:廣播模式,所有消費(fèi)者都能收到生產(chǎn)者發(fā)送的消息,速度更快,不需設(shè)置routingkey * direct:只有與routingkey配置的消費(fèi)者才能收到消息 * topic:與direct相同,只是支持模糊配置,類似正則表達(dá)式,功能更強(qiáng), **表示通配一個(gè)詞,#表示通配0個(gè)或多個(gè)詞(注意:是詞,不是字母) * *durable

durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失; * durable=false,相反 * */

channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);

//模擬發(fā)送消息

for

(int

i=0;i<100;i++){

Stringmessage="HelloWorld!";

message+=i;

/***exchange 交換機(jī)名,""為默認(rèn)交換機(jī),direct類型

*routingKey

exchange為direct、topic類型時(shí)指定routingKey,exchange為fanout類型時(shí)指定queueName隊(duì)列名 *props

MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化,rabbitmq服務(wù)重啟消息不會(huì)丟失;null:非持久化 *body 發(fā)送消息 */

channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

System.out.println("[x]Sent'"

+message+"'");

}

//關(guān)閉連接//

channel.close();//

connection.close();

}}Productor范例代碼及注意事項(xiàng) /**20Productor范例代碼及注意事項(xiàng)ConnectionFactory、Connection、Channel注意單例控制默認(rèn)心跳時(shí)間為10分鐘,發(fā)現(xiàn)個(gè)人賬戶環(huán)境防火墻為5分鐘,有可能被防火墻殺死,建議設(shè)置4分鐘connectionFactory.setRequestedHeartbeat(4*60);添加Shutdown、Blocked異常監(jiān)聽,Shutdown后重連機(jī)制、Blocked后的日志輸出Productor范例代碼及注意事項(xiàng)ConnectionFa21Consumer范例代碼及注意事項(xiàng)//以exchange為direct為例package

com.rabbitmq.test.ow.demo2;import

java.io.IOException;import

java.util.HashMap;import

java.util.Map;import

com.rabbitmq.client.BlockedListener;import

com.rabbitmq.client.Channel;import

com.rabbitmq.client.Connection;import

com.rabbitmq.client.ConnectionFactory;import

com.rabbitmq.client.QueueingConsumer;import

com.rabbitmq.client.ShutdownListener;import

com.rabbitmq.client.ShutdownSignalException;import

com.rabbitmq.client.impl.AMQCommand;import

com.rabbitmq.client.impl.AMQImpl;/***

Title:

Consumer.java*

Description:【消費(fèi)者樣例】*

@author:

zengqiang.yang*

@date:

2013-12-25**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司

*/Consumer范例代碼及注意事項(xiàng)//以exchange為d22Consumer范例代碼及注意事項(xiàng)public

class

Consumer{

//交換機(jī)命名規(guī)范:e_模塊_其他

private

static

final

StringEXCHANGE_NAME

="e_tyb_test";

//隊(duì)列命名規(guī)范:q_模塊_其他

private

static

final

StringQUEUE_NAME

="q_tyb_test";

//routingkey命名規(guī)范:r_模塊_其他

private

static

final

StringROUTING_KEY

="r_tyb_test";

public

static

void

main(String[]argv)throws

Exception{

//注意:factory應(yīng)為單例,不要每次取消息新建一次對象

ConnectionFactoryfactory=new

ConnectionFactory();

factory.setHost("90");

factory.setPort(5672);//默認(rèn)端口

factory.setUsername("guest");//默認(rèn)用戶名

factory.setPassword("guest");//默認(rèn)密碼

factory.setVirtualHost("/");//默認(rèn)虛擬主機(jī),區(qū)分權(quán)限

//設(shè)置心跳時(shí)間,防止長時(shí)間未活動(dòng)被防火墻殺死,默認(rèn)600秒,單位:秒//

factory.setRequestedHeartbeat(60*4);Consumer范例代碼及注意事項(xiàng)publicclass23Consumer范例代碼及注意事項(xiàng)

//連接超時(shí)時(shí)間,單位:毫秒//

factory.setConnectionTimeout(1000*2);

//注意:connection應(yīng)為單例,不要每次取消息新建一次對象

Connectionconnection=factory.newConnection();

//監(jiān)聽connection關(guān)閉異常

connection.addShutdownListener(new

ShutdownListener(){

@Override

public

void

shutdownCompleted(ShutdownSignalExceptioncause){

//connection異常

if

(cause.isHardError()){

System.out.println("connection異常:["

+cause.getMessage()+"]");

//程序引起的異常,如:connection.close()

if

(cause.isInitiatedByApplication()){

System.out.println("connection關(guān)閉異常,重連...begin...");//

connection=factory.newConnection();

System.out.println("connection關(guān)閉異常,重連...end...");

}

Consumer范例代碼及注意事項(xiàng) //連接超時(shí)時(shí)間,單位:24Consumer范例代碼及注意事項(xiàng)else{//rabbitmq服務(wù)引起的異常

AMQCommandamqCommand=(AMQCommand)cause.getReason();

if(amqCommand.getMethod()instanceof

AMQImpl.Connection.Close){

AMQImpl.Connection.Closeclose=(AMQImpl.Connection.Close)amqCommand.getMethod();

if(320==close.getReplyCode()){//rabbitmq服務(wù)器強(qiáng)制關(guān)閉

System.out.println("connection關(guān)閉異常,請檢查rabbitmq服務(wù)器是否正常啟動(dòng)!");

}

}

}

}

}

});

//監(jiān)聽connection阻塞異常

connection.addBlockedListener(new

BlockedListener(){

@Override

public

void

handleUnblocked()throws

IOException{

System.out.println("connection已解除阻塞!");

}

@Override

public

void

handleBlocked(Stringreason)throws

IOException{

System.out.println("connection阻塞原因:["+reason+"],請檢查內(nèi)存是否夠!");

}

});Consumer范例代碼及注意事項(xiàng)else{//rabbit25Consumer范例代碼及注意事項(xiàng)

//注意:channel應(yīng)為單例,不要每次取消息新建一次對象

Channelchannel=connection.createChannel();

//監(jiān)聽channel關(guān)閉異常

channel.addShutdownListener(new

ShutdownListener(){

@Override

public

void

shutdownCompleted(ShutdownSignalExceptioncause){

//channel異常

if

(!cause.isHardError()){

System.out.println("channel異常:["

+cause.getMessage()+"]");

}

}

});

//同一時(shí)間一個(gè)消費(fèi)者只能接收一條消息,web管理界面隊(duì)列Unacked數(shù)值,如多個(gè)隊(duì)列數(shù)值累加

channel.basicQos(1);

Consumer范例代碼及注意事項(xiàng)26Consumer范例代碼及注意事項(xiàng)/***創(chuàng)建交換機(jī)*exchange 交換機(jī)名

*type 交換機(jī)類型 fanout:廣播模式,所有消費(fèi)者都能收到生產(chǎn)者發(fā)送的消息,速度更快,不需設(shè)置routingkey* direct:只有與routingkey配置的消費(fèi)者才能收到消息* topic:與direct相同,只是支持模糊配置,類似正則表達(dá)式,功能更強(qiáng),**表示通配一個(gè)詞,#表示通配0個(gè)或多個(gè)詞(注意:是詞,不是字母)* *durable

durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;* durable=false,相反**/

channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);

/***創(chuàng)建隊(duì)列*queue 隊(duì)列名*durable 持久化隊(duì)列,true:服務(wù)器重啟隊(duì)列不會(huì)丟失;false:反之*exclusive 排他性,true:首次申明的connection連接下可見;* false:所有connection連接下都可見*connection關(guān)閉后隊(duì)列自動(dòng)刪除,忽略隊(duì)列持久化*autoDelete

true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除*arguments 可指定隊(duì)列里消息總數(shù)*/

Map<String,Object>args

=new

HashMap<String,Object>();//

args.put("x-max-length",100);

//設(shè)置隊(duì)列里的最大消息數(shù)//

args.put("x-message-ttl",1000*10);

//設(shè)置消息過期時(shí)間,時(shí)間一過消息自動(dòng)刪除,單位:毫秒//

channel.queueDeclare(QUEUE_NAME,true,false,false,args);

channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer范例代碼及注意事項(xiàng)/**27Consumer范例代碼及注意事項(xiàng)

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);

System.out.println("[*]Waitingformessages.ToexitpressCTRL+C");

//聲明一個(gè)消費(fèi)者

QueueingConsumerconsumer=new

QueueingConsumer(channel);

/** *queue 隊(duì)列名 *autoAck

true:消息從隊(duì)列刪除,不管是否正確處理;false:消息不從隊(duì)列刪除,需要ack響應(yīng) *callback 消費(fèi)者 */

channel.basicConsume(QUEUE_NAME,false,consumer);

while

(true){

//循環(huán)獲取消息

QueueingConsumer.Deliverydelivery=consumer.nextDelivery();

Stringmessage=new

String(delivery.getBody());

System.out.println("[x]Received'"

+message+"'");

doWork(message);

System.out.println("[x]Done");

//確認(rèn),消息從隊(duì)列中刪除

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

//拒絕,消息不從隊(duì)列刪除//

channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);

}

}

private

static

void

doWork(Stringtask)throws

InterruptedException{

for

(char

ch:task.toCharArray()){

if

(ch==''){

Thread.sleep(1000);

}

}

}}Consumer范例代碼及注意事項(xiàng)28Consumer范例代碼及注意事項(xiàng)ConnectionFactory、Connection、Channel注意單例控制默認(rèn)心跳時(shí)間為10分鐘,發(fā)現(xiàn)個(gè)人賬戶環(huán)境防火墻為5分鐘,有可能被防火墻殺死,建議設(shè)置4分鐘connectionFactory.setRequestedHeartbeat(4*60);添加Shutdown、Blocked異常監(jiān)聽,Shutdown后重連機(jī)制、Blocked后的日志輸出對于比較重要的消息,消費(fèi)者啟動(dòng)應(yīng)在生產(chǎn)者之前,避免生產(chǎn)者發(fā)送消息時(shí)消費(fèi)者未啟動(dòng)消息丟失Consumer范例代碼及注意事項(xiàng)ConnectionFac29開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理ConnectionFactory、Connection、Channel切勿每發(fā)送一條消息創(chuàng)建一次,因用單例控制Connection默認(rèn)心跳 //默認(rèn)600秒(10分鐘)connectionFactory.setRequestedHeartbeat(240);Exchange持久化 //durable=true持久化,durable=false非持久化 channel.exchangeDeclare(EXCHANGE_NAME,“direct”,true);開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理ConnectionFact30開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue持久化 //durable=true持久化,durable=false非持久化 channel.queueDeclare(QUEUE_NAME,true,false,false,null);Message持久化 //props=MessageProperties.PERSISTENT_TEXT_PLAIN消息持久化 channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue持久化31開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列排他性 //exclusive=true首次申明的connection連接下可見,

//exclusive=false所有connection連接下都可見 channel.queueDeclare(QUEUE_NAME,true,true,false,null);Queue隊(duì)列里消息數(shù)大小設(shè)置 Map<String,Object>args=newHashMap<String,Object>(); args.put("x-max-length",50);//設(shè)置隊(duì)列里的最大消息數(shù) channel.queueDeclare(QUEUE_NAME,true,false,false,args);開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列排他性32開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列里消息有效性 Map<String,Object>args=newHashMap<String,Object>(); //設(shè)置消息過期時(shí)間,時(shí)間一過消息自動(dòng)刪除,單位:毫秒 args.put("x-message-ttl",1000*10); channel.queueDeclare(QUEUE_NAME,true,false,false,args);開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理Queue隊(duì)列里消息有效性33開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理關(guān)閉異常ShutdownSignalException通過Connection.addShutdownListener監(jiān)聽,可分為程序引起(如:connection.close())和服務(wù)器引起兩大異常,程序引起建議捕獲異常后有重連機(jī)制阻塞異常通過connection.addBlockedListener監(jiān)聽,當(dāng)使用內(nèi)存接近最大內(nèi)存時(shí),消息會(huì)阻塞(handleBlocked方法

捕獲),可通過調(diào)整內(nèi)存大小解除阻塞(handleUnblocked方法捕獲)開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理關(guān)閉異常ShutdownSi34RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加用戶并指定密碼./rabbitmqctladd_user用戶名

密碼設(shè)置用戶權(quán)限./rabbitmqctlset_user_tags用戶名

權(quán)限組權(quán)限組4類Management PolicymakerMonitoringAdministratorRabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加用戶并指定密碼35Management查看rabbitmq服務(wù)總消息數(shù)、當(dāng)前與多少機(jī)器相連(connection)、建了多少通道(channel)、建立了多少隊(duì)列數(shù)等Management36Policymaker比management多了一個(gè)Admin選項(xiàng)卡中的policies標(biāo)簽,可添加鏡像隊(duì)列,滿足集群情況下隊(duì)列的同步Policymaker37Monitoring比management多了Overview選項(xiàng)卡中的Nodes、Portsandcontexts標(biāo)簽,可查看當(dāng)前打開socket連接數(shù)、rabbitmq內(nèi)存、硬盤使用情況;amqp端口(默認(rèn)5672)及Webmanagement管理界面端口(默認(rèn)15672,55672是rabbitmq3.0之前版本端口號)Monitoring38Administrator最高權(quán)限,用戶、虛擬主機(jī)、策略等都可設(shè)置Administrator39RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加vhost./rabbitmqctladd_vhostvhost名設(shè)置vhost權(quán)限./rabbitmqctlset_permissions-pvhost名

用戶名".*"".*"".*“Vhost下的所有資源都有讀寫權(quán)限RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加vhost40RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加鏡像隊(duì)列策略./rabbitmqctl-n節(jié)點(diǎn)名set_policy-pvhost名

策略名"^q\_tyb\_"'{"ha-mode":"all","ha-sync-mode":"automatic"}‘^q\_tyb\_設(shè)置隊(duì)列名規(guī)則ha-mode:all所有機(jī)器都接收(集群環(huán)境下)ha-sync-mode:automatic某一臺機(jī)器停止重啟后,自動(dòng)同步隊(duì)列里的消息RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)添加鏡像隊(duì)列策略41RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)最大打開文件數(shù)及socket連接數(shù)如ConnectionFactory、Connection、Channel未控制單例,每發(fā)送一條消息就創(chuàng)建一次連接,file、socket數(shù)也隨之增加,file與socket總數(shù)比例大約80%RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)最大打開文件數(shù)及sock42RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)rabbitmq內(nèi)存、硬盤使用空間rabbitmq最大內(nèi)存默認(rèn)為機(jī)器內(nèi)存的40%rabbitmq磁盤空間默認(rèn)為rabbitmq數(shù)據(jù)所在分區(qū)的可用空閑空間RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)rabbitmq內(nèi)存、硬43RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Exchange交換機(jī)的持久化交換機(jī)持久化,rabbitmq服務(wù)重啟exchange依然存在,數(shù)據(jù)保存在硬盤上RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Exchange交換機(jī)的44RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Queue隊(duì)列的持久化隊(duì)列持久化,rabbitmq服務(wù)重啟queue依然存在,數(shù)據(jù)保存在硬盤上

RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)Queue隊(duì)列的持久化45RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列最大消息數(shù)(可選)當(dāng)隊(duì)列里消息數(shù)到達(dá)最大值時(shí),后面消息會(huì)替換原先的消息,隊(duì)列里消息是先進(jìn)先出原則代碼Map<String,Object>args=newHashMap<String,Object>(); args.put("x-max-length",50);//設(shè)置隊(duì)列里的最大消息數(shù) channel.queueDeclare(QUEUE_NAME,true,false,false,args);RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列最大消息數(shù)(可選)46示意圖示意圖47RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)消息有效性(可選)消息進(jìn)入隊(duì)列過了指定時(shí)間(單位:毫秒),自動(dòng)刪除消息代碼 Map<String,Object>args=newHashMap<String,Object>(); //設(shè)置消息過期時(shí)間,時(shí)間一過消息自動(dòng)刪除,單位:毫秒 args.put("x-message-ttl",1000*10); channel.queueDeclare(QUEUE_NAME,true,false,false,args);RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)消息有效性(可選)48示意圖示意圖49RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)多機(jī)集群隊(duì)列同步如果一臺RabbitMQ服務(wù)掛了,其他機(jī)器上會(huì)有一份副本。RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)多機(jī)集群隊(duì)列同步50RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列命名規(guī)范,如:q_模塊_其他影響隊(duì)列同步,多機(jī)集群間隊(duì)列的同步需要使用policy策略來保證RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)隊(duì)列命名規(guī)范,如:q_模51RabbitMQ與Spring整合范例代碼生產(chǎn)者applicationContext-rabbitmq-producer.xml配置文件<?xml

version="1.0"

encoding="UTF-8"?><beans

xmlns=""

xmlns:xsi=""xmlns:context=""

xmlns:rabbit=""

xsi:schemaLocation="

">

<!--連接服務(wù)配置-->

<rabbit:connection-factory

id="connectionFactory"

host="90"

username="guest"

password="guest"

port="5672"

/>

<rabbit:admin

connection-factory="connectionFactory"

/>

RabbitMQ與Spring整合范例代碼生產(chǎn)者applic52RabbitMQ與Spring整合范例代碼<!--queue隊(duì)列聲明-->

<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->

<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->

<!--排他性,exclusive=true:首次申明的connection連接下可見;exclusive=false:所有connection連接下都可見-->

<rabbit:queue

id="q_tyb_test2"

durable="true"

auto-delete="false"

exclusive="false"

name="q_tyb_test2"

/>

<!--exchangequeuebingingkey綁定-->

<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->

<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->

<rabbit:direct-exchangename="e_tyb_test2"

durable="true"

auto-delete="false"

id="e_tyb_test2">

<rabbit:bindings>

<rabbit:binding

queue="q_tyb_test2"

key="r_tyb_test2"

/>

</rabbit:bindings>

</rabbit:direct-exchange>

<rabbit:template

id="amqpTemplate"

exchange="e_tyb_test2"

connection-factory="connectionFactory"

/></beans>RabbitMQ與Spring整合范例代碼53RabbitMQ與Spring整合范例代碼//生產(chǎn)者packagecom.rabbitmq.test.ow.spring;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.context.ApplicationContext;importorg.springframework.context.support.;/***

Title:

Producer.java*

Description:【rabbitmq與spring整合-生產(chǎn)者】*

@author:

zengqiang.yang*

@date:

2013-12-30**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司

*/public

classProducer{public

static

voidmain(String[]args){ApplicationContextcontext=new

("src/main/resources/applicationContext-rabbitmq-producer.xml");RabbitMQ與Spring整合范例代碼//生產(chǎn)者54RabbitMQ與Spring整合范例代碼AmqpTemplateamqpTemplate=(AmqpTemplate)context.getBean("amqpTemplate");//模擬發(fā)送消息for(inti=0;i<100;i++){Stringmessage="HelloWorld!";message+=i;/***r_tyb_test2routingkey*message發(fā)送消息*/amqpTemplate.convertAndSend("r_tyb_test2",message);System.out.println("Sent:"+message);}}}RabbitMQ與Spring整合范例代碼55RabbitMQ與Spring整合范例代碼消費(fèi)者applicationContext-rabbitmq-consumer.xml配置文件<?xml

version="1.0"

encoding="UTF-8"?><beans

xmlns=""

xmlns:xsi=""

xmlns:context=""

xmlns:rabbit=""

xsi:schemaLocation="

">

<!--連接服務(wù)配置-->

<rabbit:connection-factory

id="connectionFactory"

host="90"

username="guest"

password="guest"

port="5672"

/>

<rabbit:admin

connection-factory="connectionFactory"

/>

<!--queue隊(duì)列聲明-->

<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->

<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->

<!--排他性,exclusive=true:首次申明的connection連接下可見;exclusive=false:所有connection連接下都可見-->RabbitMQ與Spring整合范例代碼消費(fèi)者applic56RabbitMQ與Spring整合范例代碼

<rabbit:queue

id="q_tyb_test2"

durable="true"

auto-delete="false"

exclusive="false"

name="q_tyb_test2"

/>

<!--exchangequeuebinging

key綁定-->

<!--durable=true,交換機(jī)持久化,rabbitmq服務(wù)重啟交換機(jī)依然存在,保證不丟失;durable=false,相反-->

<!--auto-delete=true:無消費(fèi)者時(shí),隊(duì)列自動(dòng)刪除;auto-delete=false:無消費(fèi)者時(shí),隊(duì)列不會(huì)自動(dòng)刪除-->

<rabbit:direct-exchange

name="e_tyb_test2"

durable="true"

auto-delete="false"

id="e_tyb_test2">

<rabbit:bindings>

<rabbit:binding

queue="q_tyb_test2"

key="r_tyb_test2"

/>

</rabbit:bindings>

</rabbit:direct-exchange>

<!--定義消費(fèi)者監(jiān)聽-->

<bean

id="consumerLitener"

class="com.rabbitmq.test.ow.spring.ConsumerLitener"></bean>

<rabbit:listener-container

connection-factory="connectionFactory"

acknowledge="auto">

<rabbit:listener

queues="q_tyb_test2"

ref="consumerLitener"

/>

</rabbit:listener-container></beans>RabbitMQ與Spring整合范例代碼 <rabbit:57RabbitMQ與Spring整合范例代碼//消費(fèi)者packagecom.rabbitmq.test.ow.spring;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageListener;importorg.springframework.context.ApplicationContext;importorg.springframework.context.support.;/***

Title:

ConsumerLitener.java*

Description:【rabbitmq與spring整合-消費(fèi)者】*

@author:

zengqiang.yang*

@date:

2013-12-30**版權(quán)所有(c)2013,天翼電子商務(wù)有限公司

*/public

classConsumerLitenerimplementsMessageListener{@Overridepublic

voidonMessage(Messagearg0){RabbitMQ與Spring整合范例代碼//消費(fèi)者58RabbitMQ與Spring整合范例代碼Stringmessage=newString(arg0.getBody());try{Thread.sleep(1000);System.out.println("Recv:"+message);}catch(InterruptedExceptione){e.printStackTrace();}}public

static

voidmain(String[]args){

//啟動(dòng)容器ApplicationContextcontext=new

("src/main/resources/applicationContext-rabbitmq-consumer.xml");}}RabbitMQ與Spring整合范例代碼59經(jīng)常不斷地學(xué)習(xí),你就什么都知道。你知道得越多,你就越有力量StudyConstantly,AndYouWillKnowEverything.TheMoreYouKnow,TheMorePowerfulYouWillBe寫在最后經(jīng)常不斷地學(xué)習(xí),你就什么都知道。你知道得越多,你就越有力量寫60謝謝你的到來學(xué)習(xí)并沒有結(jié)束,希望大家繼續(xù)努力LearningIsNotOver.IHopeYouWillContinueToWorkHard演講人:XXXXXX時(shí)間:XX年XX月XX日

謝謝你的到來演講人:XXXXXX61RabbitMQ介紹

RabbitMQ介紹

62目錄什么是MQMQ有什么優(yōu)勢哪些情況下建議使用MQ什么是RabbitMQ選擇RabbitMQ理由RabbitMQ服務(wù)場景RabbitMQ結(jié)構(gòu)圖RabbitMQ名詞解釋目錄什么是MQ63目錄RabbitMQ客戶端使用流程(productor/cunsumer)Productor范例代碼及注意事項(xiàng)Consumer范例代碼及注意事項(xiàng)開發(fā)中注意事項(xiàng)及重點(diǎn)關(guān)注異常處理RabbitMQ服務(wù)端配置及重點(diǎn)參數(shù)RabbitMQ與Spring整合范例代碼目錄RabbitMQ客戶端使用流程(productor/cu64什么是MQ?MQ全稱為MessageQueue,消息隊(duì)列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法(消息傳遞),一般用作進(jìn)程間通訊MQ有什么優(yōu)勢?MQ本身是異步的,往隊(duì)列里發(fā)送消息后無需等待,不同于通信協(xié)議。如HTTP協(xié)議(同步),客戶端發(fā)出請求后必須等待服務(wù)器回應(yīng)哪些情況下建議使用MQ高并發(fā)應(yīng)用來不及處理,實(shí)時(shí)性要求不高多應(yīng)用之間異步通信,且耗時(shí)操作什么是MQ?65什么是RabbitMQRabbitMQ是由Erlang(愛立信公司)語言開發(fā),實(shí)現(xiàn)AdvancedMessageQueuingProtocol(AMQP高級消息隊(duì)列協(xié)議)的消息中間件。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。什么是RabbitMQRabbitMQ是由Erlang(愛立66選擇RabbitMQ理由Reliability可靠性Exchange交換機(jī)、Queue隊(duì)列、Message消息持久化、高可用性FlexibleRouting靈活路由Clustering集群分為Disc(硬盤)與RAM(內(nèi)存),保證至少一臺DiscHighlyAvailableQueues高可用隊(duì)列與集群結(jié)合使用,設(shè)置隊(duì)列間的消息同步ManagementUI管理界面選擇RabbitMQ理由Reliability可靠性67異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電/隊(duì)列爆滿)單機(jī)丟失RabbitMQ支持集群,多臺機(jī)器隊(duì)列同步,丟失消息可從其他機(jī)器上獲取網(wǎng)絡(luò)丟失掉電RabbitMQ支持持久化,數(shù)據(jù)保存在硬盤上隊(duì)列爆滿RabbitMQ支持流控機(jī)制,可修改內(nèi)存大小,默認(rèn)為機(jī)器內(nèi)存的40%異常情況下RabbitMQ處理方式(單機(jī)丟失/網(wǎng)絡(luò)丟失/掉電68RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(異步)WorkQueues(消息均勻分配消息給消費(fèi)者)Publish/Subscribe(廣播模式,消息分發(fā)給所有的消費(fèi)者)Routing(消費(fèi)者接收消息由路由規(guī)則決定,簡單路由名)Topics(消費(fèi)者接收消息由路由規(guī)則決定,路由規(guī)則名比較復(fù)雜)RPC遠(yuǎn)程調(diào)用(同步)RabbitMQ服務(wù)場景應(yīng)用程序之間無需即時(shí)返回且耗時(shí)操作(69RabbitMQ結(jié)構(gòu)圖RabbitMQ結(jié)構(gòu)圖70RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例如RabbitMQ服務(wù)Vhost:虛擬主機(jī),默認(rèn)為“/”,一個(gè)broker里可以有多個(gè)vhost,區(qū)分不同用戶權(quán)限,類似java的命令空間Connection:應(yīng)用程序與broker連接,可有多個(gè)連接Channel:消息通道,connection中可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù),所有操作都在channel中進(jìn)行。RabbitMQ名詞解釋Broker:消息隊(duì)列服務(wù)器實(shí)體,例71RabbitMQ名詞解釋Exchange:消息交換機(jī),channel中可有多個(gè),用于投遞消息。應(yīng)用程序發(fā)送消息時(shí)先把消息給交換機(jī),由交換機(jī)投遞給隊(duì)列,不是直接給隊(duì)列Queue:隊(duì)列,用于存放消息Message:消息,應(yīng)用程序需要發(fā)送的數(shù)據(jù)Bind:根據(jù)routingKey綁定exchange與queue規(guī)則,決定消息發(fā)送的方向RabbitMQ名詞解釋Exchange:消息交換機(jī),cha72RabbitMQ對象間關(guān)系broker可多個(gè)Connection可多個(gè)Channel可多個(gè)Exchange可多個(gè)QueuemessageRabbitMQ對象間關(guān)系broker可多個(gè)可多個(gè)可多個(gè)可多73Exchange主要3種類型Fanout:不處理路由鍵(沒有routingKey),只需把隊(duì)列綁定到交換機(jī)上。發(fā)送到交換機(jī)的消

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論