《通信數據分析與實戰》課件-第六章 Kafka 分布式發布訂閱消息系統_第1頁
《通信數據分析與實戰》課件-第六章 Kafka 分布式發布訂閱消息系統_第2頁
《通信數據分析與實戰》課件-第六章 Kafka 分布式發布訂閱消息系統_第3頁
《通信數據分析與實戰》課件-第六章 Kafka 分布式發布訂閱消息系統_第4頁
《通信數據分析與實戰》課件-第六章 Kafka 分布式發布訂閱消息系統_第5頁
已閱讀5頁,還剩56頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

通信數據分析與實戰Kafka分布式發布訂閱消息系統第六章第1節2知道消息傳遞模式熟悉Kafka的核心組件學習目標TARGET消息傳遞模式簡介一個消息系統負責將數據從一個應用程序傳遞到另外一個應用程序中,應用程序只關注數據,無需關注數據在多個應用之間是如何傳遞的,分布式消息傳遞基于可靠的消息隊列,在客戶端應用和消息系統之間異步傳遞消息。消息系統有兩種主要消息傳遞模式,分別是點對點消息傳遞模式和發布訂閱消息傳遞模式。消息傳遞模式簡介1.倒排索引介紹1.倒排索引介紹1.點對點消息傳遞模式點對點消息傳遞模式結構中,消息是通過一個虛擬通道進行傳輸的,生產者發送一條數據,消息將持久化到一個隊列中,此時將有一個或多個消費者會消費隊列中數據,但是一條消息只能被消費一次,且消費后的消息會從消息隊列中刪除,因此,即使有多個消費者同時消費數據,數據都可以被有序處理。消息傳遞模式簡介1.倒排索引介紹2.發布訂閱消息傳遞模式在發布訂閱模式中,發布者用于發布消息,訂閱者用于訂閱消息,發布訂閱模式可以有多種不同的訂閱者,發布者發布的消息會被持久化到一個主題中,這與點對點模式不同的是,訂閱者可訂閱一個或多個主題,訂閱者可讀取該主題中所有數據,同一條數據可被多個訂閱者消費,數據被消費后也不會立即刪除。Kafka的概述Kafka是由Apache軟件基金會開發的一個開源流處理平臺,它由Scala和Java語言編寫,是一個基于Zookeeper系統的分布式發布訂閱消息系統,該項目的設計初衷是為實時數據提供一個統一、高通量、低等待的消息傳遞平臺。Kafka的概述應用程序A應用程序B高度解耦高吞吐低延遲擴展性持久性容錯性多語言Kafka的核心組件組件名稱相關說明Topic特定類別消息流稱為主題,數據存在主題中,主題被拆分成分區Partition主題的數據分割為一個或多個分區,每個分區的數據使用多個segment文件存儲,分區中的數據是有序的Offset每個分區消息具有的唯一序列標識Replica副本只是一個分區的備份,它們用于防止數據丟失Producer生產者即數據發布者,該角色將消息發布到Kafka集群主題中Kafka的核心組件組件名稱相關說明Consumer消費者可從Broker中讀取數據,可消費多個主題數據Broker每個Kafka服務節點都為Broker,Broker接收消息后,將消息追加到segment文件中Leader負責分區的所有讀寫操作Follower跟隨領導指令,若Leader發生故障則選一個Follower為新LeaderConsumerGroup實現一個主題消息的廣播和單播的手段Kafka的核心組件生產者主題分區一分區二分區三offsetoffsetoffset服務器節點備份消費組消費者一消費者二消費者三11小結知道消息傳遞模式熟悉Kafka的核心組件通信數據分析與實戰Kafka分布式發布訂閱消息系統第六章第2節13熟悉Kafka的工作原理學習目標TARGETKafka工作原理生產者向Kafka集群中生產消息。Producer是消息的生產者,通常情況下,數據消息源可是服務器日志、業務數據及Web服務數據等,生產者采用推送的方式將數據消息發布到Kafka的主題中,主題本質就是一個目錄,而主題是由PartitionLogs(分區日志)組成,每條消息都被追加到分區中。1.生產者生產消息過程Kafka工作原理1.生產者生產消息過程1Producer先讀取Zookeeper的“/brokers/.../state”節點中找到該Partition的Leader。2Producer將消息發送給Leader。3Leader負責將消息寫入本地分區Log文件中。Kafka工作原理1.生產者生產消息過程4Follower從Leader中讀取消息,完成備份操作。5Follower寫入本地Log文件后,會向Leader發送Ack,每次發送消息都會有一個確認反饋機制,以確保消息正常送達。6Leader收到所有Follower發送的Ack后,向Producer發送Ack,生產消息完成。Kafka工作原理1.生產者生產消息過程Kafka工作原理2.消費者消費消息過程Kafka采用拉取模型,由消費者記錄消費狀態,根據主題、Zookeeper集群地址和要消費消息的偏移量,每個消費者互相獨立地按順序讀取每個分區的消息,消費者消費消息的流程圖如下所示。19小結熟悉Kafka的工作原理通信數據分析與實戰Kafka分布式發布訂閱消息系統第六章第3節21掌握Kafka的安裝和啟動掌握Kafka基于命令行的使用學習目標TARGETKafka集群部署與測試1.安裝Kafka1下載Kafka安裝包,并解壓至hadoop01節點中的/export/software目錄下。2修改配置文件。在perties配置文件中指定broker編號、Kafka運行日志存放的路徑、指定Zookeeper地址和本地IP。3添加環境變量。在/etc/profile文件中添加Kafka環境變量。4分發文件。將Kafka安裝目錄kafka_2.11-2.0.0及環境配置文件profile分發至hadoop02、hadoop03上,并修改broker.id和。Kafka集群部署與測試1.安裝Kafka1下載Kafka安裝包,并解壓至hadoop01節點中的/export/software目錄下。#1.切換到軟件包存放目錄cd/export/software#2.將kafka_2.11-2.0.0.tgz上傳到指定位置rz#3.解壓到指定目錄/export/servers/tar-zxvfkafka_2.11-2.0.0.tgz/export/servers/Kafka集群部署與測試1.安裝Kafka#broker的全局唯一編號,不能重復broker.id=0#kafka運行日志存放的路徑log.dirs=/export/data/kafka/#broker需要使用zookeeper保存meta數據zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181#刪除topicdelete.topic.enable=true#設置本機IP=hadoop01Kafka集群部署與測試1.安裝Kafkavi/etc/profile

exportKAFKA_HOME=/export/servers/kafka_2.11-2.0.0exportPATH=$PATH:$KAFKA_HOME/binKafka集群部署與測試1.安裝Kafkacd/export/serversscp-rkafka_2.11-2.0.0hadoop02:/export/servers/scp-rkafka_2.11-2.0.0hadoop02:/export/servers/scp/etc/profilehadoop02:/etc/profilescp/etc/profilehadoop03:/etc/profile#分別在hadoop010203上面激活profileSourceprofileKafka集群部署與測試1.安裝Kafka

#hadoop02

broker.id=1=hadoop02

#hadoop03

broker.id=2=hadoop03cd/export/servers/kafka_2.11-2.0.0/confvipertiesKafka集群部署與測試啟動Zookeeper服務Kafka集群部署與測試啟動Kafka服務cd/export/servers/kafka_2.11-2.0.0bin/kafka-server-start.shconfig/perties基于命令行方式使用Kafka1.創建主題$kafka-topics.sh--create\--topicittopic\--partitions3\--replication-factor2\--zookeeperhadoop01:2181,hadoop02:2181,hadoop03:2181基于命令行方式使用Kafka$kafka-console-producer.sh\--broker-listhadoop01:9092,hadoop02:9092,hadoop03:9092\--topicittopic--hellokafka2.向主題中發送消息數據基于命令行方式使用Kafka3.消費主題中的消息$kafka-console-consumer.sh\--from-beginning--topicittopic\--bootstrap-serverhadoop01:9092,hadoop02:9092,hadoop03:909233小結掌握Kafka的安裝和啟動掌握Kafka基于命令行的使用通信數據分析與實戰Kafka分布式發布訂閱消息系統第六章第4節35掌握Kafka的生產者實例掌握Kafka的消費者實例學習目標TARGETKafka生產者消費者實例1.基于JavaAPI方式使用Kafka

用戶不僅能夠通過命令行的形式操作Kafka服務,Kafka還提供了許多編程語言的客戶端工具,用戶在開發獨立項目時,通過調用KafkaAPI來操作Kafka集群,其核心API主要有5種,分別是ProducerAPI、ConsumerAPI、StreamsAPI、ConnectAPI、AdminClientAPI。Kafka生產者消費者實例KafkaProducer常用API方法名稱相關說明abortTransaction?()終止正在進行的事物close?()關閉這個生產者flush?()調用此方法使所有緩沖的記錄立即發送partitionsFor?(java.lang.Stringtopic)獲取給定主題的分區元數據send?(ProducerRecord<K,V>record)異步發送記錄到主題Kafka生產者消費者實例KafkaConsumer常用API方法名稱相關說明subscribe?(java.util.Collection<java.lang.String>topics)訂閱給定主題列表以獲取動態分區close?()關閉這個消費者wakeup?()喚醒消費者metrics?()獲取消費者保留的指標listTopics?()獲取有關用戶有權查看的所有主題的分區的元數據Kafka生產者消費者實例操作1創建一個名為“spark_chapter06”的Maven工程,在pom.xml文件中添加Kafka依賴。2創建KafkaProducerTest文件用于生產消息數據并將數據發送到Kafka集群。3通過KafkaAPI創建KafkaConsumerTest對象,用于消費Kafka集群中名為“ittopic”主題的消息數據。Kafka生產者消費者實例操作消費者消費消息效果圖Kafka生產者消費者實例操作1創建一個名為“spark_chapter06”的Maven工程,在pom.xml文件中添加Kafka依賴。<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency></dependencies>Kafka生產者消費者實例操作2創建KafkaProducerTest文件用于生產消息數據并將數據發送到Kafka集群。publicclassKafkaProducerTest{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();//1、指定Kafka集群的IP地址和端口號props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

//2、指定等待所有副本節點的應答props.put("acks","all");

//3、指定消息發送最大嘗試次數props.put("retries",0);

//4、指定一批消息處理大小props.put("batch.size",16384);

//5、指定請求延時props.put("linger.ms",1);Kafka生產者消費者實例操作2創建KafkaProducerTest文件用于生產消息數據并將數據發送到Kafka集群。

//6、指定緩存區內存大小

props.put("buffer.memory",33554432);

//7、設置key序列化

props.put("key.serializer","mon.serialization.StringSerializer");

//8、設置value序列化

props.put("value.serializer","mon.serialization.StringSerializer");

//9、生產數據

KafkaProducer<String,String>producer=newKafkaProducer<String,String>(props);for(inti=0;i<50;i++){producer.send(newProducerRecord<String,String>("ittopic",Integer.toString(i),"helloworld-"+i));}producer.close();Kafka生產者消費者實例操作3通過KafkaAPI創建KafkaConsumer對象,用于消費Kafka集群中名為“ittopic”主題的消息數據。publicclassKafkaConsumerTest{publicstaticvoidmain(String[]args){

//1.準備配置文件Propertiesprops=newProperties();

//2.指定Kafka集群主機名和端口號props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

//3.指定消費者組ID,在同一時刻同一消費組中只有//一個線程可以去消費一個分區消息,不同的消費組可以去消費同一個分區的消息。props.put("group.id","ittopic");

//4.自動提交偏移量props.put("mit","true");Kafka生產者消費者實例操作3通過KafkaAPI創建KafkaConsumer對象,用于消費Kafka集群中名為“ittopic”主題的消息數據。//5.自動提交時間間隔,每秒提交一次

props.put("erval.ms","1000");props.put("key.deserializer","mon.serialization.StringDeserializer");props.put("value.deserializer","mon.serialization.StringDeserializer");KafkaConsumer<String,String>kafkaConsumer=newKafkaConsumer<String,String>(props);

//6.訂閱消息,這里的topic可以是多個

kafkaConsumer.subscribe(Arrays.asList("ittopic"));Kafka生產者消費者實例操作3通過KafkaAPI創建KafkaConsumer對象,用于消費Kafka集群中名為“ittopic”主題的消息數據。//7.獲取消息

while(true){//每隔100ms就拉去一次

ConsumerRecords<String,String>records=kafkaConsumer.poll(100);for(ConsumerRecord<String,String>record:records){System.out.printf("topic=%s,offset=%d,key=%s,value=%s%n",record.topic(),record.offset(),record.key(),record.value());}}47小結掌握Kafka的生產者實例掌握Kafka的消費者實例通信數據分析與實戰Kafka分布式發布訂閱消息系統第六章第5節49熟悉KafkaStreams的作用掌握KafkaStreams的案例學習目標TARGETKafka

Streams概述

KafkaStreams是ApacheKafka開源的一個流處理框架,基于Kafka的生產者和消費者,為開發者提供流式處理能力,具有低延遲性、高擴展性、彈性、容錯的特點,易于集成到現有應用程序中。它是一套處理分析Kafka中存儲數據的客戶端類庫,處理完的數據可重新寫回Kafka,也可發送給外部存儲系統。Kafka

Streams概述在流式計算框架模型中,通常需要構建數據流的拓撲結構,例如生產數據源、分析數據的處理器及處理完后發送的目標節點,Kafka流處理框架同樣將“輸入主題自定義處理器輸出主題”抽象成一個DAG拓撲圖。生產者作為數據源不斷生產和發送消息至Kafka的testStreams1主題中,通過自定義處理器對每條消息執行相應計算邏輯,最后將結果發送到Kafka的testStreams2主題中供消費者消費消息數據。Kafka

Streams案例1在spark_chapter06項目中,打開pom.xml文件,添加KafkaStreams依賴。2創建LogProcessor類,并繼承StreamsAPI中的Processor接口,實現單詞計數業務邏輯。3單詞計數的業務功能開發完成后,KafkaStreams需要編寫一個運行主程序的類App,用來測試LogProcessor業務程序。Kafka

Streams案例4在hadoop01節點創建testStreams1和testStreams2主題。5分別在hadoop01和hadoop02節點啟動生產者服務和消費者服務。6運行App主程序類。在生產者服務節點(hadoop01)中輸入測試語句,返回消費者服務節點(hadoop02)中查看執行結果。Kafka

Streams案例<dependency>?

<groupId>org.apache.kafka</groupId>?

<artifactId>kafka-streams</artifactId>?

<version>2.0.0</version></dependency>Kafka

Streams案例publicclassLogProcessorimplementsProcessor<byte[],byte[]>{

//上下文對象

privateProcessorContextprocessorContext;

@Override

publicvoidinit(ProcessorContextprocessorContext){//初始化方法

this.processorContext=processorContext;}

@Override

publicvoidprocess(byte[]key,byte[]value){

//處理一條消息

StringinputOri=newString(value);

HashMap<String,Integer>map=newHashMap<String,Integer>();

inttimes=1;

if(inputOri.contains("")){

//截取字段

String[]words=inputOri.split("");

for(Stringword:words){

if(map.containsKey(word)){

map.put(word,map.get(word)+1);

}else{

map.put(word,times);}}}

inputOri=map.toString();

processorContext.forward(key,inputOri.getBytes());}

@Override

publicvoidclose(){}Kafka

Streams案例publicclassApp{

publicstaticvoidmain(String[]args){

StringfromTopic="testStreams1";

//聲明來源主題

StringtoTopic="testStreams2";

//聲明目標主題

Propertiesprops=newProperties();

//設置參數

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092,hadoop03:9092");

//實例化StreamsConfig

StreamsConfigconfig=newStreamsConfig(props);

//構建拓撲結構

Topologytopology=newTopology();

//添加源處理節點,為源處理節點指定名稱和它訂閱的主題

topology.addSource("SOURCE",fromTopic)

//添加自定義處理節點,指定名稱,處理器類和上一個節點的名稱

.addProcessor("PROCESSOR",newProcessorSupplier(

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論