《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka_第1頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka_第2頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka_第3頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka_第4頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka_第5頁
已閱讀5頁,還剩16頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》配套實(shí)驗(yàn)指導(dǎo)書實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka編寫者:西京學(xué)院-徐魯輝實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka本實(shí)驗(yàn)的知識(shí)地圖如圖10-1所示(表示重點(diǎn)表示難點(diǎn))。圖10-1實(shí)驗(yàn)10實(shí)戰(zhàn)Kafka知識(shí)地圖一、實(shí)驗(yàn)?zāi)康?.了解Kafka的功能。2.理解Kafka的體系架構(gòu)。3.熟練掌握Kafka集群的部署。4.熟練掌握KafkaShell常用命令的使用。5.了解KafkaAPI編程。二、實(shí)驗(yàn)環(huán)境本實(shí)驗(yàn)所需的軟件環(huán)境包括CentOS、OracleJDK1.6+、ZooKeeper集群、Kafka安裝包、Eclipse。三、實(shí)驗(yàn)內(nèi)容1.規(guī)劃Kafka集群。2.部署Kafka集群。3.啟動(dòng)Kafka集群。4.驗(yàn)證Kafka集群。5.使用KafkaShell:創(chuàng)建Topic、查看Topic,啟動(dòng)Producer生產(chǎn)消息,啟動(dòng)Consumer消費(fèi)消息。四、實(shí)驗(yàn)原理ApacheKafka是一個(gè)分布式流平臺(tái),允許發(fā)布和訂閱記錄流,用于在不同系統(tǒng)之間傳遞數(shù)據(jù),是Apache的頂級(jí)項(xiàng)目。(一)初識(shí)Kafka消息系統(tǒng)負(fù)責(zé)將數(shù)據(jù)從一個(gè)應(yīng)用程序傳輸?shù)搅硪粋€(gè)應(yīng)用程序,這樣應(yīng)用程序可以專注于數(shù)據(jù),而不用擔(dān)心如何共享它。分布式消息傳遞基于可靠消息隊(duì)列的概念,消息在客戶端應(yīng)用程序和消息傳遞系統(tǒng)之間異步排隊(duì),有兩種類型的消息模型,一種是點(diǎn)對(duì)點(diǎn)消息模型,另一種是發(fā)布/訂閱消息模型。在點(diǎn)對(duì)點(diǎn)消息模型中,消息被保留在Queue中。消息生產(chǎn)者生產(chǎn)消息發(fā)送到Queue中,消息消費(fèi)者從Queue中取出并消費(fèi)消息。Queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi),一旦消費(fèi)者讀取隊(duì)列中的消息,它就從該隊(duì)列中消失,不會(huì)產(chǎn)生重復(fù)消費(fèi)現(xiàn)象。該系統(tǒng)的典型示例是訂單處理系統(tǒng),其中每個(gè)訂單將由一個(gè)訂單處理器處理,但多個(gè)訂單處理器也可以同時(shí)工作。點(diǎn)對(duì)點(diǎn)消息模型的結(jié)構(gòu)如圖10-2所示。QueueQueue…Producer1Producer2ProducerN…Consumer1Consumer2ConsumerN圖10-2點(diǎn)對(duì)點(diǎn)消息模型結(jié)構(gòu)在發(fā)布/訂閱消息模型中,消息被保留在Topic中。消息生產(chǎn)者(發(fā)布者)將消息發(fā)布到Topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱者)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布者發(fā)送到Topic的消息,只有訂閱了Topic的訂閱者才會(huì)收到消息。發(fā)布/訂閱消息模型的結(jié)構(gòu)如圖10-3所示。TopicTopic…Publisher1Publisher2PublisherN…Subscriber1Subscriber2SubscriberN圖10-3發(fā)布/訂閱消息模型結(jié)構(gòu)ApacheKafka是一個(gè)分布式的、支持分區(qū)的、多副本的、基于ZooKeeper的發(fā)布/訂閱消息系統(tǒng),起源于LinkedIn開源出來的分布式消息系統(tǒng),2011年成為Apache開源項(xiàng)目,2012年成為Apache頂級(jí)項(xiàng)目,目前被多家公司采用。Kafka采用Scala和Java編寫,其設(shè)計(jì)目的是通過Hadoop和Spark等并行加載機(jī)制來統(tǒng)一在線和離線的消息處理,構(gòu)建在ZooKeeper上,目前與越來越多的分布式處理系統(tǒng)如ApacheStorm、ApacheSpark等都能夠較好的集成,用于實(shí)時(shí)流式數(shù)據(jù)分析。Kafka專為分布式高吞吐量系統(tǒng)而設(shè)計(jì),非常適合處理大規(guī)模消息,它與傳統(tǒng)消息系統(tǒng)相比,具有以下幾點(diǎn)不同:(1)Kafka是一個(gè)分布式系統(tǒng),易于向外擴(kuò)展。(2)Kafka同時(shí)為發(fā)布和訂閱提供高吞吐量。(3)Kafka支持多訂閱者,當(dāng)訂閱失敗時(shí)能自動(dòng)平衡消費(fèi)者。(4)Kafka支持消息持久化,消費(fèi)端為拉模型,消費(fèi)狀態(tài)和訂閱關(guān)系由客戶端負(fù)責(zé)維護(hù),消息消費(fèi)完后不會(huì)立即刪除,會(huì)保留歷史消息。(二)Kafka體系架構(gòu)Kafka整體架構(gòu)比較新穎,更適合異構(gòu)集群,其體系架構(gòu)如圖10-4所示。Kafka中主要有Producer、Broker和Customer三種角色,一個(gè)典型的Kafka集群包含多個(gè)Producer、多個(gè)Broker、多個(gè)ConsumerGroup和一個(gè)ZooKeeper集群。每個(gè)Producer可以對(duì)應(yīng)多個(gè)Topic,每個(gè)Consumer只能對(duì)應(yīng)一個(gè)ConsumerGroup,整個(gè)Kafka集群對(duì)應(yīng)一個(gè)ZooKeeper集群,通過ZooKeeper管理集群配置、選舉Leader以及在ConsumerGroup發(fā)生變化時(shí)進(jìn)行Rebalance。訂閱消息訂閱消息發(fā)布消息到Partition

ConsumerGroup2ConsumerGroup1Producer1Producer2Producer3Producer4KafkaBroker1KafkaBroker2KafkaBroker3Consumer1Consumer2Consumer3Consumer4Consumer5ZooKeeper圖10-4Kafka體系架構(gòu)在消息保存時(shí),Kafka根據(jù)Topic進(jìn)行分類,發(fā)送消息者稱為Producer,接收消息者稱為Customer,不同Topic的消息在物理上時(shí)分開存儲(chǔ)的,但在邏輯上用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處。這里,還需要解釋幾個(gè)名詞。(1)Message消息。Message是通信的基本單位,每個(gè)Producer可以向一個(gè)Topic發(fā)布一些消息,Kafka中的消息是以Topic為基本單位組織的,消息是無狀態(tài)的,消息消費(fèi)的先后順序是沒有關(guān)系的。每條Message包含三個(gè)屬性:offset,消息的唯一標(biāo)識(shí),類型為long;MessageSize,消息的大小,類型為int;data,消息的具體內(nèi)容,可以看作一個(gè)字節(jié)數(shù)組。(2)Topic主題。發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic,Kafka根據(jù)Topic對(duì)消息進(jìn)行歸類,發(fā)布到Kafka集群的每條消息都需要指定一個(gè)Topic。(3)Partition分區(qū)。物理上的概念,一個(gè)Topic可以分為多個(gè)Partition,每個(gè)Partition內(nèi)部都是有序的。每個(gè)Partition只能由一個(gè)Consumer來進(jìn)行消費(fèi),但是一個(gè)Consumer可以消費(fèi)多個(gè)Partition。(4)Broker消息中間件處理節(jié)點(diǎn)。一個(gè)Kafka集群由多個(gè)Kafka實(shí)例組成,每個(gè)實(shí)例被稱為Broker。一個(gè)Broker上可以創(chuàng)建一個(gè)或多個(gè)Topic,同一個(gè)Topic可以在同一Kafka集群下的多個(gè)Broker上分布,Broker與Topic的關(guān)系圖如圖10-5所示。Broker1Broker1Topic1Topic2Topic3Topic4Broker2Topic1Topic2Topic3Broker3Topic1Topic2圖10-5Broker與Topic關(guān)系圖(5)Producer消息生產(chǎn)者,向Broker發(fā)送消息的客戶端。(6)Consumer消息消費(fèi)者,從Broker讀取消息的客戶端。(7)ConsumerGroup每個(gè)Consumer屬于一個(gè)特定的ConsumerGroup,一條消息可以發(fā)送到多個(gè)不同的ConsumerGroup,但是一個(gè)ConsumerGroup中只能有一個(gè)Consumer能夠消費(fèi)該消息。關(guān)于Kafka體系架構(gòu)中涉及的重要構(gòu)件詳細(xì)說明如下。(1)Producer(生產(chǎn)者)Producer用于將流數(shù)據(jù)發(fā)送到Kafka消息隊(duì)列上,它的任務(wù)是向Broker發(fā)送數(shù)據(jù),通過ZooKeeper獲取可用的Broker列表。Producer作為消息的生產(chǎn)者,在生產(chǎn)消息后需要將消息投送到指定的目的地,即某個(gè)Topic的某個(gè)Partition。Producer可以選擇隨機(jī)的方式來發(fā)布消息到Partition,也支持選擇特定的算法發(fā)布消息到相應(yīng)的Partition。以日志采集為例,生產(chǎn)過程分為三部分:一是對(duì)日志采集的本地文件或目錄進(jìn)行監(jiān)控,若有內(nèi)容變化,則將變化的內(nèi)容逐行讀取到內(nèi)存的消息隊(duì)列中;二是連接Kafka集群,包括一些配置信息,諸如壓縮與超時(shí)設(shè)置等;三是將已經(jīng)獲取的數(shù)據(jù)通過上述連接推送(push)到Kafka集群。(2)BrokerKafka集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱為Broker,Broker可以理解為是Kafka服務(wù)器緩存代理。Kafka支持消息持久化,生產(chǎn)者生產(chǎn)消息后,Kafka不會(huì)直接把消息傳遞給消費(fèi)者,而是先在Broker中存儲(chǔ),持久化保存在Kafka的日志文件中。可以采用在Broker日志中追加消息的方式進(jìn)行持久化存儲(chǔ),并進(jìn)行分區(qū)(Partition),為了減少磁盤寫入的次數(shù),Broker會(huì)將消息暫時(shí)緩存起來,當(dāng)消息的個(gè)數(shù)達(dá)到一定閾值時(shí),再清空(flush)到磁盤,這樣就減少了IO調(diào)用的次數(shù)。Kafka的Broker采用的是無狀態(tài)機(jī)制,即Broker沒有副本,一旦Broker宕機(jī),該Broker的消息將都不可用,但是消息本身是持久化的,Broker在宕機(jī)重啟后讀取消息的日志就可以恢復(fù)消息。消息保存一定時(shí)間(通常為7天)后會(huì)被刪除。Broker不保存訂閱者狀態(tài),由訂閱者自己保存,消息訂閱者可以回退到任意位置重新進(jìn)行消費(fèi),當(dāng)訂閱者出現(xiàn)故障時(shí),可以選擇最小的offset進(jìn)行重新讀取并消費(fèi)消息。(3)Consumer(消費(fèi)者)Consumer負(fù)責(zé)訂閱Topic并處理消息,每個(gè)Consumer可以訂閱多個(gè)Topic,每個(gè)Consumer會(huì)保留它讀取到某個(gè)Partition的消息唯一標(biāo)識(shí)號(hào)(offset),Consumer是通過ZooKeeper來保留消息唯一標(biāo)識(shí)號(hào)(offset)。ConsumerGroup在邏輯上將Consumer分組,每個(gè)KafkaConsumer是一個(gè)進(jìn)程,所以一個(gè)ConsumerGroup中的Consumer將可能是由分布在不同機(jī)器上的不同進(jìn)程組成。Topic中的每一條消息可以被多個(gè)不同的ConsumerGroup消費(fèi),但是一個(gè)ConsumerGroup中只能有一個(gè)Consumer來消費(fèi)該消息。所以,若想要一個(gè)消息被多個(gè)Consumer消費(fèi),那么這些Consumer就必須在不同的ConsumerGroup中。因此,也可以理解為ConsumerGroup才是Topic在邏輯上的訂閱者。(三)安裝Kafka1.運(yùn)行環(huán)境部署與運(yùn)行Kafka所需要的系統(tǒng)環(huán)境,包括操作系統(tǒng)、Java環(huán)境、ZooKeeper集群三部分。1)操作系統(tǒng)Kafka支持不同操作系統(tǒng),例如GNU/Linux、Windows、MacOSX等。需要注意的是,在Linux上部署Kafka要比在Windows上部署能夠得到更高效的I/O處理性能。編者采用的操作系統(tǒng)為L(zhǎng)inux發(fā)行版CentOS7。2)Java環(huán)境Kafka使用Java語言編寫,因此它的運(yùn)行環(huán)境需要Java環(huán)境的支持。編者采用的Java為OracleJDK1.8。3)ZooKeeper集群Kafka依賴ZooKeeper集群,因此運(yùn)行Kafka之前需要首先啟動(dòng)ZooKeeper集群。Zookeeper集群可以自己搭建,也可以使用Kafka安裝包中內(nèi)置的shell腳本啟動(dòng)Zookeeper。編者采用自行搭建ZooKeeper集群,版本為3.4.13。2.運(yùn)行模式Kafka有兩種運(yùn)行模式:?jiǎn)螜C(jī)模式和集群模式。單機(jī)模式是只在一臺(tái)機(jī)器上安裝Kafka,主要用于開發(fā)測(cè)試,而集群模式則是在多臺(tái)機(jī)器上安裝ZooKeeper,也可以在一臺(tái)機(jī)器上模擬集群模式,實(shí)際的生產(chǎn)環(huán)境中均采用多臺(tái)服務(wù)器的集群模式。無論哪種部署方式,修改Kafka的配置文件perties都是至關(guān)重要的。單機(jī)模式和集群模式部署的步驟基本一致,只是在perties文件的配置上有些差異。3.配置文件安裝Kafka后,在$KAFKA_HOME/config中有多個(gè)配置文件,如圖10-6所示。圖10-6Kafka配置文件列表其中,配置文件perties部分配置參數(shù)及其含義如表10-1所示。表10-1perties配置參數(shù)(部分)參數(shù)名說明broker.id用于指定Broker服務(wù)器對(duì)應(yīng)的ID,各個(gè)服務(wù)器的值不同listeners表示監(jiān)聽端口,PLAINTEXT表示純文本,也就是說,不管發(fā)送什么數(shù)據(jù)類型都以純文本的方式接收,包括圖片,視頻等work.threads網(wǎng)絡(luò)線程數(shù),默認(rèn)是3num.io.threadsI/O線程數(shù),默認(rèn)是8socket.send.buffer.bytes套接字發(fā)送緩沖,默認(rèn)是100KBsocket.receive.buffer.bytes套接字接收緩沖,默認(rèn)是100KBsocket.request.max.bytes接收到的最大字節(jié)數(shù),默認(rèn)是100MBlog.dirs用于指定Kafka數(shù)據(jù)存放目錄,地址可以是多個(gè),多個(gè)地址需用逗號(hào)分割num.partitions分區(qū)數(shù),默認(rèn)是1num.recovery.threads.per.data.dir每一個(gè)文件夾的恢復(fù)線程,默認(rèn)是1log.retention.hours數(shù)據(jù)保存時(shí)間,默認(rèn)是168小時(shí),即一個(gè)星期(7天)log.segment.bytes指定每個(gè)數(shù)據(jù)日志保存最大數(shù)據(jù),默認(rèn)為1GB,當(dāng)超過這個(gè)值時(shí),會(huì)自動(dòng)進(jìn)行日志滾動(dòng)erval.ms設(shè)置日志過期的時(shí)間,默認(rèn)每隔300秒(即5分鐘)zookeeper.connect用于指定Kafka所依賴的ZooKeeper集群的IP和端口號(hào),地址可以是多個(gè),多個(gè)地址需用逗號(hào)分割zookeeper.connection.timeout.ms設(shè)置Zookeeper的連接超時(shí)時(shí)間,默認(rèn)為6秒,如果到達(dá)這個(gè)指定時(shí)間仍然連接不上就默認(rèn)該節(jié)點(diǎn)發(fā)生故障(四)KafkaShellKafka支持的所有命令在$KAFKA_HOME/bin下存放,如圖10-7所示。圖10-7KafkaShell命令Kafka常用命令描述如表10-2所示。表10-2Kafka常用命令命令功能描述kafka-server-start.sh啟動(dòng)KafkaBrokerkafka-server-stop.sh關(guān)閉KafkaBrokerkafka-topics.sh創(chuàng)建、刪除、查看、修改Topickafka-console-producer.sh啟動(dòng)Producer,生產(chǎn)消息,從標(biāo)準(zhǔn)輸入讀取數(shù)據(jù)并發(fā)布到Kafkakafka-console-consumer.sh啟動(dòng)Consumer,消費(fèi)消息,從Kafka讀取數(shù)據(jù)并輸出到標(biāo)準(zhǔn)輸出輸入命令“kafka-topics.sh--help”,即可查看該命令的使用幫助,如圖10-8所示,展示了命令“kafka-topics.sh”的幫助信息,使用該命令時(shí),必須指定以下5個(gè)選項(xiàng)之一:--list、--describe、--create、--alter、--delete。由于過長(zhǎng),此處僅展示部分內(nèi)容。圖10-8命令kafka-topics.sh幫助信息(五)KafkaAPIKafka支持5個(gè)核心的API,包括ProducerAPI、ConsumerAPI、StreamsAPI、ConnectAPI、AdminClientAPI。關(guān)于KafkaAPI的更多介紹讀者請(qǐng)參考官網(wǎng)/documentation/#api。五、實(shí)驗(yàn)步驟(一)規(guī)劃Kafka集群1.Kafka集群規(guī)劃Kafka有兩種運(yùn)行模式:?jiǎn)螜C(jī)模式和集群模式。編者擬配置3個(gè)Broker的Kafka集群,將Kafka集群運(yùn)行在Linux上,將使用3臺(tái)安裝有Linux操作系統(tǒng)的機(jī)器,主機(jī)名分別為master、slave1、slave2。具體Kafka集群的規(guī)劃如表10-3所示。表10-3Kafka集群部署規(guī)劃表主機(jī)名IP地址運(yùn)行服務(wù)軟硬件配置master30QuorumPeerMainKafka內(nèi)存:4GCPU:1個(gè)2核硬盤:40G操作系統(tǒng):CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.1slave131QuorumPeerMainKafka內(nèi)存:1GCPU:1個(gè)1核硬盤:20G操作系統(tǒng):CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.1slave232QuorumPeerMainKafka內(nèi)存:1GCPU:1個(gè)1核硬盤:20G操作系統(tǒng):CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.12.軟件選擇本實(shí)驗(yàn)中所使用各種軟件的名稱、版本、發(fā)布日期及下載地址如表10-4所示。表10-4本實(shí)驗(yàn)使用的軟件名稱、版本、發(fā)布日期及下載地址軟件名稱軟件版本發(fā)布日期下載地址VMwareWorkstationProVMwareWorkstation14.5.7ProforWindows2017年6月22日/products/workstation-pro.htmlCentOSCentOS7.6.18102018年11月26日/download/JavaOracleJDK8u1912018年10月16日/technetwork/java/javase/downloads/index.htmlZooKeeperZooKeeper3.4.132018年7月15日/releases.htmlKafkaKafka2.1.12019年2月15日/downloadsEclipseEclipseIDE2018-09forJavaDevelopers2018年9月/downloads/packages注意,編者的3個(gè)節(jié)點(diǎn)的機(jī)器名分別為master、slave1、slave2,IP地址依次為30、31、32,后續(xù)內(nèi)容均在表10-3規(guī)劃基礎(chǔ)上完成,請(qǐng)讀者務(wù)必確認(rèn)自己的機(jī)器名、IP等信息。由于本章之前已完成VMwareWorkstationPro、CentOS、Java的安裝,故本實(shí)驗(yàn)直接從安裝Kafka開始講述。(二)部署Kafka集群1.初始軟硬件環(huán)境準(zhǔn)備(1)準(zhǔn)備3臺(tái)機(jī)器,安裝操作系統(tǒng),編者使用CentOSLinux7。(2)對(duì)集群內(nèi)每一臺(tái)機(jī)器,配置靜態(tài)IP、修改機(jī)器名、添加集群級(jí)別域名映射、關(guān)閉防火墻。(3)對(duì)集群內(nèi)每一臺(tái)機(jī)器,安裝和配置Java,要求Java8或更高版本,編者使用OracleJDK8u191。(4)安裝和配置Linux集群中各節(jié)點(diǎn)間的SSH免密登錄。(5)在Linux集群上部署ZooKeeper集群。以上步驟編者已在教材實(shí)驗(yàn)1、實(shí)驗(yàn)4中詳細(xì)介紹,具體操作過程請(qǐng)讀者參見教材,此處不再贅述。2.獲取KafkaKafka官方下載地址為/downloads,編者選用的Kafka版本是2019年2月15日發(fā)布的Kafka2.1.1,其安裝包文件kafka_2.12-2.1.1.tgz例如存放在master機(jī)器的/home/xuluhui/Downloads中。讀者應(yīng)該注意到了,Kafka安裝包和一般安裝包的命名方式不一樣,例如kafka_2.12-2.1.1.tgz,其中2.12是Scala版本,2.1.1才是Kafka版本,官方強(qiáng)烈建議Scala版本和服務(wù)器上的Scala版本保持一致,避免引發(fā)一些不可預(yù)知的問題,故編者選用的是kafka_2.12-2.1.1.tgz,而非kafka_2.11-2.1.1.tgz。3.安裝Kafka以下所有操作需要在3臺(tái)機(jī)器上完成。切換到root,解壓kafka_2.12-2.1.1.tgz到安裝目錄如/usr/local下,使用命令如下所示。surootcd/usr/localtar-zxvf/home/xuluhui/Downloads/kafka_2.12-2.1.1.tgz4.配置Kafka修改Kafka配置文件perties,master機(jī)器上的配置文件$KAFKA_HOME/config/perties修改后的幾個(gè)參數(shù)如下所示。broker.id=0log.dirs=/usr/local/kafka_2.12-2.1.1/kafka-logszookeeper.connect=master:2181,slave1:2181,slave2:2181slave1和slave2機(jī)器上的配置文件$KAFKA_HOME/config/perties中參數(shù)broker.id依次設(shè)置為1、2,其余參數(shù)值與master機(jī)器相同。5.創(chuàng)建所需目錄以上第4步驟使用了系統(tǒng)不存在的目錄:Kafka數(shù)據(jù)存放目錄/usr/local/kafka_2.12-2.1.1/kafka-logs,因此需要?jiǎng)?chuàng)建它,使用的命令如下所示。mkdir/usr/local/kafka_2.12-2.1.1/kafka-logs6.設(shè)置$KAFKA_HOME目錄屬主為了在普通用戶下使用Kafka,將$KAFKA_HOME目錄屬主設(shè)置為L(zhǎng)inux普通用戶例如xuluhui,使用以下命令完成。chown-Rxuluhui/usr/local/kafka_2.12-2.1.17.在系統(tǒng)配置文件目錄/etc/profile.d下新建kafka.sh使用“vim/etc/profile.d/kafka.sh”命令在/etc/profile.d文件夾下新建文件kafka.sh,添加如下內(nèi)容。exportKAFKA_HOME=/usr/local/kafka_2.12-2.1.1exportPATH=$KAFKA_HOME/bin:$PATH其次,重啟機(jī)器,使之生效。此步驟可省略,之所以將$KAFKA_HOME/bin加入到系統(tǒng)環(huán)境變量PATH中,是因?yàn)楫?dāng)輸入Kafka命令時(shí),無需再切換到$KAFKA_HOME/bin,這樣使用起來會(huì)更加方便,否則會(huì)出現(xiàn)錯(cuò)誤信息“bash:****:commandnotfound...”。至此,Kafka在3臺(tái)機(jī)器上安裝和配置完畢。當(dāng)然,為了提高效率,讀者也可以首先僅在master一臺(tái)機(jī)器上完成Kafka的安裝和配置,然后使用“scp”命令在Kafka集群內(nèi)將master機(jī)器上的$KAFKA_HOME目錄和系統(tǒng)配置文件/etc/profile.d/kafka.sh遠(yuǎn)程拷貝至其他KafkaBroker如slave1、slave2,接著修改slave1、slave2上$KAFKA_HOME/config/perties中參數(shù)broker.id,最后設(shè)置其他KafkaBroker上$KAFKA_HOME目錄屬主。其中,同步Kafka目錄和系統(tǒng)配置文件kafka.sh到Kafka集群其它機(jī)器依次使用的命令如下所示,效果如圖10-9所示。scp-r/usr/local/kafka_2.12-2.1.1root@slave1:/usr/local/kafka_2.12-2.1.1scp-r/etc/profile.d/kafka.shroot@slave1:/etc/profile.d/kafka.shscp-r/usr/local/kafka_2.12-2.1.1root@slave2:/usr/local/kafka_2.12-2.1.1scp-r/etc/profile.d/kafka.shroot@slave2:/etc/profile.d/kafka.sh圖10-9使用scp命令同步Kafka目錄和系統(tǒng)配置文件kafka.sh到Kafka集群其它機(jī)器(如slave1)(三)啟動(dòng)Kafka集群首先,使用如下命令“zkServer.shstart”啟動(dòng)ZooKeeper集群,確保其正常運(yùn)行,效果如圖10-10所示。因?yàn)樵贚inux集群各機(jī)器節(jié)點(diǎn)間已配置好SSH免密登錄,所以可以僅在master一臺(tái)機(jī)器上輸入一系列命令以啟動(dòng)整個(gè)ZooKeeper集群。圖10-10啟動(dòng)ZooKeeper集群其次,在3臺(tái)機(jī)器上使用以下命令啟動(dòng)Kafka。若Linux集群各機(jī)器節(jié)點(diǎn)間已配置好SSH免密登錄,也可以僅在master一臺(tái)機(jī)器上輸入一系列命令以關(guān)閉整個(gè)Kafka集群。kafka-server-start.sh-daemon$KAFKA_HOME/config/perties這里需要注意的是,啟動(dòng)腳本若不加-daemon參數(shù),則如果執(zhí)行Ctrl+Z后會(huì)退出,且啟動(dòng)的進(jìn)程也會(huì)退出,所以建議加-daemon參數(shù),實(shí)現(xiàn)以守護(hù)進(jìn)程方式啟動(dòng)。(四)驗(yàn)證Kafka集群檢查Kafka是否啟動(dòng),可以使用命令“jps”查看Java進(jìn)程來驗(yàn)證,效果如圖10-11所示,可以看到,3臺(tái)機(jī)器上均有Kafka進(jìn)程,說明Kafka集群部署成功。圖10-11使用jps命令查看Kafka進(jìn)程(五)使用KafkaShell【案例10-1】使用Kafka命令創(chuàng)建Topic、查看Topic,啟動(dòng)Producer生產(chǎn)消息,啟動(dòng)Consumer消費(fèi)消息。(1)創(chuàng)建Topic在任意一臺(tái)機(jī)器上創(chuàng)建Topic“kafkacluster-test”,例如在master機(jī)器上完成,使用命令如下所示,運(yùn)行效果如圖10-12所示。kafka-topics.sh--create\--zookeepermaster:2181,slave1:2181,slave2:2181\--replication-factor3\--partitions3\--topickafkacluster-test由于總共部署了3個(gè)Broker,所以創(chuàng)建Topic時(shí)能指定--replication-factor3。圖10-12創(chuàng)建Topic運(yùn)行效果其中,選項(xiàng)--zookeeper用于指定ZooKeeper集群列表,可以指定所有節(jié)點(diǎn),也可以指定為部分節(jié)點(diǎn);選項(xiàng)--replication-factor為復(fù)制數(shù)目,數(shù)據(jù)會(huì)自動(dòng)同步到其他Broker上,防止某個(gè)Broker宕機(jī)數(shù)據(jù)丟失;選項(xiàng)--partitions用于指定一個(gè)Topic可以切分成幾個(gè)partition,一個(gè)消費(fèi)者可以消費(fèi)多個(gè)partition,但一個(gè)partition只能被一個(gè)消費(fèi)者消費(fèi)。(2)查看Topic詳情在任意一臺(tái)機(jī)器上查看Topic“kafkacluster-test”的詳情,例如在slave1機(jī)器上完成,使用命令如下所示,運(yùn)行效果如圖10-13所示。kafka-topics.sh--describe\--zookeepermaster:2181,slave1:2181,slave2:2181\--topickafkacluster-test圖10-13查看Topic詳情運(yùn)行效果命令“kafka-topics.sh--describe”的輸出解釋:第一行是所有分區(qū)的摘要,從第二行開始,每一行提供一個(gè)分區(qū)信息。Leader:該節(jié)點(diǎn)負(fù)責(zé)該分區(qū)的所有的讀和寫,每個(gè)節(jié)點(diǎn)的Leader都是隨機(jī)選擇的。replicas:副本的節(jié)點(diǎn)列表,不管該節(jié)點(diǎn)是否是Leader或者目前是否還活著,只是顯示。isr:“同步副本”的節(jié)點(diǎn)列表,也就是活著的節(jié)點(diǎn)并且正在同步Leader。從圖中可以看出,Topic“kafkacluster-test”總計(jì)有3個(gè)分區(qū)(PartitionCount),副本數(shù)為3(ReplicationFactor),且每個(gè)分區(qū)上有3個(gè)副本(通過Replicas的值可以得出),另外最后一列Isr(In-SyncReplicas)表示處理同步狀

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論