《大數據技術實戰案例教程》課件實驗指導書-實驗7Kafka實戰_第1頁
《大數據技術實戰案例教程》課件實驗指導書-實驗7Kafka實戰_第2頁
《大數據技術實戰案例教程》課件實驗指導書-實驗7Kafka實戰_第3頁
《大數據技術實戰案例教程》課件實驗指導書-實驗7Kafka實戰_第4頁
《大數據技術實戰案例教程》課件實驗指導書-實驗7Kafka實戰_第5頁
已閱讀5頁,還剩6頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

《大數據技術實戰案例教程》實驗指導書實驗7Kafka實戰編寫者:徐魯輝實驗7Kafka實戰7.1實驗目的1.理解Kafka體系架構及Broker、Producer和Customer三種角色功能。2.理解Kafka部署要點包括運行環境、運行模式、配置文件perties。3.熟練掌握在Linux環境下部署Kafka集群,使用KafkaShell命令完成分布式消息的發布和訂閱。7.2實驗環境本實驗所需的軟件環境包括Linux集群(至少3臺機器)、Java環境、ZooKeeper集群、Kafka安裝包。7.3實驗內容1.規劃Kafka集群。2.部署Kafka集群。3.啟動Kafka集群。4.驗證Kafka集群。5.使用KafkaShell命令完成分布式消息的發布和訂閱。。6.關閉Kafka集群。7.4實驗原理7.4.1初識KafkaApacheKafka是一個分布式的、支持分區的、多副本的、基于ZooKeeper的發布/訂閱消息系統,起源于LinkedIn公司開源出來的分布式消息系統,2011年成為Apache開源項目,2012年成為Apache頂級項目,目前被多家公司采用。Kafka采用Scala和Java編寫,其設計目的是通過Hadoop和Spark等并行加載機制來統一在線和離線的消息處理,構建在ZooKeeper上,不同的分布式系統可統一接入到Kafka,實現和Hadoop各組件之間不同數據的實時高效交換,被稱為“生態系統的交通樞紐”。目前與越來越多的分布式處理系統如ApacheStorm、ApacheSpark等都能夠較好的集成,用于實時流式數據分析。7.4.2Kafka體系架構Kafka整體架構比較新穎,更適合異構集群,其體系架構如圖7-1所示。Kafka中主要有Producer、Broker和Customer三種角色,一個典型的Kafka集群包含多個Producer、多個Broker、多個ConsumerGroup和一個ZooKeeper集群。每個Producer可以對應多個Topic,每個Consumer只能對應一個ConsumerGroup,整個Kafka集群對應一個ZooKeeper集群,通過ZooKeeper管理集群配置、選舉Leader以及在ConsumerGroup發生變化時進行負載均衡。訂閱消息訂閱消息發布消息到Partition

ConsumerGroup2ConsumerGroup1Producer1Producer2Producer3Producer4KafkaBroker1KafkaBroker2KafkaBroker3Consumer1Consumer2Consumer3Consumer4Consumer5ZooKeeper圖7-1Kafka體系架構在消息保存時,Kafka根據Topic對消息進行分類,發送消息者稱為Producer,接收消息者稱為Customer,不同Topic的消息在物理上是分開存儲的,但在邏輯上用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處。關于Kafka體系架構中涉及的重要構件詳細說明如下。(1)Producer(生產者)。Producer用于將流數據發送到Kafka消息隊列上,它的任務是向Broker發送數據,通過ZooKeeper獲取可用的Broker列表。Producer作為消息的生產者,在生產消息后需要將消息投送到指定的目的地,即某個Topic的某個Partition。Producer可以選擇隨機的方式來發布消息到Partition,也支持選擇特定的算法發布消息到相應的Partition。以日志采集為例,生產過程分為三部分:一是對日志采集的本地文件或目錄進行監控,若有內容變化,則將變化的內容逐行讀取到內存的消息隊列中;二是連接Kafka集群,包括一些配置信息,諸如壓縮與超時設置等;三是將已經獲取的數據通過上述連接推送(push)到Kafka集群。(2)Broker。Kafka集群中的一臺或多臺服務器統稱為Broker,Broker可以理解為是Kafka服務器緩存代理。Kafka支持消息持久化,生產者生產消息后,Kafka不會直接把消息傳遞給消費者,而是先在Broker中存儲,持久化保存在Kafka的日志文件中。可以采用在Broker日志中追加消息的方式進行持久化存儲,并進行分區(Partition),為了減少磁盤寫入的次數,Broker會將消息暫時緩存起來,當消息個數達到一定閾值時,清空(flush)緩存并寫入到磁盤,這樣就減少了IO調用的次數。Kafka的Broker采用的是無狀態機制,即Broker沒有副本,一旦Broker宕機,該Broker的消息將都不可用,但是消息本身是持久化的,Broker在宕機重啟后讀取消息的日志就可以恢復消息。消息保存一定時間(通常為7天)后會被刪除。Broker不保存訂閱者狀態,由訂閱者自己保存,消息訂閱者可以回退到任意位置重新進行消費,當訂閱者出現故障時,可以選擇最小的offset進行重新讀取并消費消息。(3)Consumer(消費者)。Consumer負責訂閱Topic并處理消息,每個Consumer可以訂閱多個Topic,每個Consumer會保留它讀取到某個Partition的消息唯一標識號(offset),Consumer是通過ZooKeeper來保留消息唯一標識號(offset)的。ConsumerGroup在邏輯上將Consumer分組,每個KafkaConsumer是一個進程,所以一個ConsumerGroup中的Consumer將可能是由分布在不同機器上的不同進程組成。Topic中的每一條消息可以被多個不同的ConsumerGroup消費,但是一個ConsumerGroup中只能有一個Consumer來消費該消息。所以,若想要一個消息被多個Consumer消費,那么這些Consumer就必須在不同的ConsumerGroup中。因此,也可以理解為ConsumerGroup才是Topic在邏輯上的訂閱者。7.4.3Kafka部署要點1.Kafka運行環境部署與運行Kafka所需要的系統環境,包括操作系統、Java環境、ZooKeeper集群三部分。1)操作系統Kafka支持不同操作系統,例如GNU/Linux、Windows、MacOSX等。需要注意的是,在Linux上部署Kafka要比在Windows上部署能夠得到更高效的I/O處理性能。編者采用的操作系統為Linux發行版CentOS7。2)Java環境Kafka使用Java語言編寫,因此它的運行環境需要Java環境的支持。編者采用的Java為OracleJDK1.8。3)ZooKeeper集群Kafka依賴ZooKeeper集群,因此運行Kafka之前需要首先啟動ZooKeeper集群。Zookeeper集群可以自己搭建,也可以使用Kafka安裝包中內置的shell腳本啟動Zookeeper。編者采用自行搭建ZooKeeper集群,版本為3.4.13。2.Kafka運行模式Kafka有兩種運行模式:單機模式和集群模式。單機模式是只在一臺機器上安裝Kafka,主要用于開發測試,而集群模式則是在多臺機器上安裝Kafka,也可以在一臺機器上模擬集群模式,實際的生產環境中均采用多臺服務器的集群模式。無論哪種部署方式,修改Kafka的配置文件perties都是至關重要的。單機模式和集群模式部署的步驟基本一致,只是在perties文件的配置上有些差異。3.Kafka配置文件在$KAFKA_HOME/config下有多個配置文件,其中,配置文件perties部分配置參數及其含義如表7-1所示。表7-1perties配置參數(部分)參數名說明broker.id用于指定Broker服務器對應的ID,各個服務器的值不同listeners表示監聽的地址及端口,PLAINTEXT表示純文本,也就是說,不管發送什么數據類型都以純文本的方式接收,包括圖片,視頻等work.threads網絡線程數,默認是3num.io.threadsI/O線程數,默認是8socket.send.buffer.bytes套接字發送緩沖,默認是100KBsocket.receive.buffer.bytes套接字接收緩沖,默認是100KBsocket.request.max.bytes接收到的最大字節數,默認是100MBlog.dirs用于指定Kafka數據存放目錄,地址可以是多個,多個地址需用逗號分割num.partitions分區數,默認是1num.recovery.threads.per.data.dir每一個文件夾的恢復線程,默認是1log.retention.hours數據保存時間,默認是168h,即一個星期(7天)log.segment.bytes指定每個數據日志保存最大數據,默認為1GB,當超過這個值時,會自動進行日志滾動erval.ms設置日志過期的時間,默認為300s(即5min)zookeeper.connect用于指定Kafka所依賴的ZooKeeper集群的IP和端口號,地址可以是多個,多個地址需用逗號分割zookeeper.connection.timeout.ms設置Zookeeper的連接超時時間,默認為6s,如果到達這個指定時間仍然連接不上就默認該節點發生故障7.4.4KafkaShell常用命令Kafka支持的所有命令在$KAFKA_HOME/bin下存放,如圖7-2所示。圖7-2KafkaShell命令Kafka常用命令的描述如表7-2所示。表7-2Kafka常用命令命令功能描述kafka-server-start.sh啟動KafkaBrokerkafka-server-stop.sh關閉KafkaBrokerkafka-topics.sh創建、刪除、查看、修改Topickafka-console-producer.sh啟動Producer,生產消息,從標準輸入讀取數據并發布到Kafkakafka-console-consumer.sh啟動Consumer,消費消息,從Kafka讀取數據并輸出到標準輸出輸入命令“kafka-topics.sh--help”,即可查看該命令的使用幫助,圖7-3展示了命令“kafka-topics.sh”的部分幫助信息,使用該命令時,必須指定以下5個選項之一:--list、--describe、--create、--alter、--delete。由于過長,此處僅展示部分內容。圖7-3命令kafka-topics.sh幫助信息(部分)7.5實驗步驟7.5.1規劃Kafka集群1.Kafka集群規劃編者擬配置3個Broker的Kafka集群,將Kafka集群運行在Linux上,將使用三臺安裝有Linux操作系統的機器,主機名分別為master、slave1、slave2。具體Kafka集群的規劃如表7-3所示。表7-3Kafka集群部署規劃表主機名IP地址運行服務軟硬件配置master30QuorumPeerMainKafka內存:4GCPU:1個2核硬盤:40G操作系統:CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.1slave131QuorumPeerMainKafka內存:1GCPU:1個1核硬盤:20G操作系統:CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.1slave232QuorumPeerMainKafka內存:1GCPU:1個1核硬盤:20G操作系統:CentOS7.6.1810Java:OracleJDK8u191ZooKeeper:ZooKeeper3.4.13Kafka:Kafka2.1.12.軟件選擇本實驗部署Kafka集群所使用各種軟件的名稱、版本、發布日期及下載地址如表7-4所示。表7-4本實驗部署Kafka集群使用的軟件名稱、版本、發布日期及下載地址軟件名稱軟件版本發布日期下載地址VMwareWorkstationProVMwareWorkstation14.5.7ProforWindows2017年6月22日/products/workstation-pro.htmlCentOSCentOS7.6.18102018年11月26日/download/JavaOracleJDK8u1912018年10月16日/technetwork/java/javase/downloads/index.htmlZooKeeperZooKeeper3.4.132018年7月15日/releases.htmlKafkaKafka2.1.12019年2月15日/downloads注意,編者的三個節點的機器名分別為master、slave1、slave2,IP地址依次為30、31、32,后續內容均在表7-3規劃基礎上完成,請讀者務必與之對照確認自己的機器名、IP等信息。由于實驗1已完成VMwareWorkstationPro、CentOS、Java的安裝,故本實驗直接從安裝Kafka開始講述。7.5.2部署Kafka集群1.初始軟硬件環境準備(1)準備三臺機器,安裝操作系統,編者使用CentOSLinux7。(2)對集群內每一臺機器,配置靜態IP、修改機器名、添加集群級別域名映射、關閉防火墻。(3)對集群內每一臺機器,安裝和配置Java,要求Java8或更高版本,編者使用OracleJDK8u191。(4)安裝和配置Linux集群中各節點間的SSH免密登錄。(5)在Linux集群上部署ZooKeeper集群。以上步驟已在實驗1和實驗4中詳細介紹,此處不再贅述。2.獲取KafkaKafka官方下載地址為/downloads,編者選用的Kafka版本是2019年2月15日發布的Kafka2.1.1,其安裝包文件kafka_2.12-2.1.1.tgz例如存放在master機器的/home/xuluhui/Downloads中。讀者應該注意到了,Kafka安裝包和一般安裝包的命名方式不一樣,例如kafka_2.12-2.1.1.tgz,其中2.12是Scala版本,2.1.1才是Kafka版本,官方強烈建議Scala版本和服務器上的Scala版本保持一致,避免引發一些不可預知的問題,故編者選用的是kafka_2.12-2.1.1.tgz,而非kafka_2.11-2.1.1.tgz。3.安裝Kafka以下所有操作需要在三臺機器上完成。切換到root,解壓kafka_2.12-2.1.1.tgz到安裝目錄如/usr/local下,使用命令如下所示。[xuluhui@master~]$suroot[root@masterxuluhui]#cd/usr/local[root@masterlocal]#tar-zxvf/home/xuluhui/Downloads/kafka_2.12-2.1.1.tgz4.配置Kafka修改Kafka配置文件perties,master機器上的配置文件$KAFKA_HOME/config/perties修改后的幾個參數如下所示。broker.id=0log.dirs=/usr/local/kafka_2.12-2.1.1/kafka-logszookeeper.connect=master:2181,slave1:2181,slave2:2181slave1和slave2機器上的配置文件$KAFKA_HOME/config/perties中參數broker.id依次設置為1、2,其余參數值與master機器相同。5.創建所需目錄以上第4步驟使用了系統不存在的目錄:Kafka數據存放目錄/usr/local/kafka_2.12-2.1.1/kafka-logs,因此需要創建它,使用的命令如下所示。[root@masterlocal]#mkdir/usr/local/kafka_2.12-2.1.1/kafka-logs6.設置$KAFKA_HOME目錄屬主為了在普通用戶下使用Kafka,將$KAFKA_HOME目錄屬主設置為Linux普通用戶例如xuluhui,使用以下命令完成。[root@masterlocal]#chown-Rxuluhui/usr/local/kafka_2.12-2.1.17.在系統配置文件目錄/etc/profile.d下新建kafka.sh使用“vim/etc/profile.d/kafka.sh”命令在/etc/profile.d文件夾下新建文件kafka.sh,添加如下內容。exportKAFKA_HOME=/usr/local/kafka_2.12-2.1.1exportPATH=$KAFKA_HOME/bin:$PATH其次,重啟機器,使之生效。此步驟可省略,之所以將$KAFKA_HOME/bin加入到系統環境變量PATH中,是因為當輸入Kafka命令時,無需再切換到$KAFKA_HOME/bin,這樣使用起來會更加方便,否則會出現錯誤信息“bash:****:commandnotfound...”。至此,Kafka在三臺機器上安裝和配置完畢。當然,為了提高效率,讀者也可以首先僅在master一臺機器上完成Kafka的安裝和配置,然后使用“scp”命令在Kafka集群內將master機器上的$KAFKA_HOME目錄和系統配置文件/etc/profile.d/kafka.sh遠程拷貝至其他KafkaBroker如slave1、slave2,接著修改slave1、slave2上$KAFKA_HOME/config/perties中參數broker.id,最后設置其他KafkaBroker上$KAFKA_HOME目錄屬主。其中,同步Kafka目錄和系統配置文件kafka.sh到Kafka集群其它機器依次使用的命令如下所示。[root@masterkafka_2.12-2.1.1]#scp-r/usr/local/kafka_2.12-2.1.1root@slave1:/usr/local/kafka_2.12-2.1.1[root@masterkafka_2.12-2.1.1]#scp-r/etc/profile.d/kafka.shroot@slave1:/etc/profile.d/kafka.sh[root@masterkafka_2.12-2.1.1]#scp-r/usr/local/kafka_2.12-2.1.1root@slave2:/usr/local/kafka_2.12-2.1.1[root@masterkafka_2.12-2.1.1]#scp-r/etc/profile.d/kafka.shroot@slave2:/etc/profile.d/kafka.sh7.5.3啟動Kafka集群首先,在三臺機器上使用命令“zkServer.shstart”啟動ZooKeeper集群,確保其正常運行。其次,在三臺機器上使用以下命令啟動Kafka,此處以master機器為例。[xuluhui@master~]$kafka-server-start.sh-daemon$KAFKA_HOME/config/perties這里需要注意的是,啟動腳本若不加-daemon參數,則如果執行Ctrl+Z后會退出,且啟動的進程也會退出,所以建議加-daemon參數,實現以守護進程方式啟動。7.5.4驗證Kafka集群檢查Kafka是否啟動,可以使用命令“jps”查看Java進程來驗證,效果如圖7-4所示,可以看到,三臺機器上均有Kafka進程,說明Kafka集群部署成功。圖7-4使用jps命令查看Kafka進程7.5.5使用KafkaShell【案例7-1】使用Kafka命令創建Topic、查看Topic,啟動Producer生產消息,啟動Consumer消費消息。(1)創建Topic在任意一臺機器上創建Topic“kafkacluster-test”,例如在master機器上完成,使用命令如下所示。[xuluhui@master~]$kafka-topics.sh--create\--zookeepermaster:2181,slave1:2181,slave2:2181\--replication-factor3\--partitions3\--topickafkacluster-test由于共部署了三個Broker,所以創建Topic時能指定--replication-factor3。其中,選項--zookeeper用于指定ZooKeeper集群列表,可以指定所有節點,也可以指定為部分節點;選項--replication-factor為復制數目,數據會自動同步到其他Broker上,防止某個Broker宕機數據丟失;選項--partitions用于指定一個Topic可以切分成幾個partition,一個消費者可以消費多個partition,但一個partition只能被一個消費者消費。(2)查看Topic詳情在任意一臺機器上查看Topic“kafkacluster-test”的詳情,例如在slave1機器上完成,使用命令如下所示,運行效果如圖7-5所示。[xuluhui@slave1~]$kafka-topics.sh--describe\--zookeepermaster:2181,slave1:2181,slave2:2181\--topickafkacluster-test圖7-5查看Topic詳情運行效果命令“kafka-topics.sh--describe”的輸出解釋:第一行是所有分區的摘要,從第二行開始,每一行提供一個分區信息。Leader:該節點負責該分區的所有的讀和寫,每個節點的Leader都是隨機選擇的。replicas:副本的節點列表,不管該節點是否是Leader或者目前是否還活著,都會被顯示在副本的節點列表中。isr:“同步副本”的節點列表,也就是活著的節點并且正在同步Leader。從圖中可以看出,Topic“kafkacluster-test”總計有3個分區(PartitionCount),副本數為3(ReplicationFactor),且每個分區上有3個副本(通過Replicas的值可以得出),另外最后一列Isr(In-SyncReplicas)表示處

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論