




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
本課件包括演示文稿、示例、代碼、題庫、視頻和聲音等內容,北風網和講師擁有完全知識產權;只限于善意學習者在本課程使用,不得在課程范圍外向任何第三方散播。任何其他人或者機構不得盜版、復制、仿造其中的創意和內容,我們保留一切通過法律手段追究違反者的權利。課程詳情請咨詢微信公眾號:北風教育官方網址:/法律聲明消息訂閱框架KAFKA高吞吐量的分布式發布訂閱消息系統主講人:Gerry上海育創網絡科技有限公司課上課下“九字”真言認真聽,善摘錄,勤思考多溫故,樂實踐,再發散四不原則不懶散惰性,不遲到早退不請假曠課,不拖延作業一點注意事項違反“四不原則”,不包就業和推薦就業課程要求嚴格是大愛寄語Kafka初識Kafka功能架構
Kafka重要概念
Kafka安裝部署與測試
Kafka
Producer講解
Kafka
Consumer講解
Kafka與Flume集成
Kafka與Log4j集成
Kafka集群監控學習內容與目標It
lets
you
publish
and
subscribe
to
streams
of
records.
In
this
respect
itis
similar
to
a
message
queue
or
enterprise
messaging
systemKafka?
is
a
distributed,
partitioned,
replicated
commit
log
serviceKafka?
isusedfor
building
real-timedata
pipelines
apps.Features:Horizontally
Scalable:水平可擴展(擴展性)
Fault-tolerant:容錯(容錯性&可用性&可靠性)
Fast:快速Distributed:分布式What
is
Kafka??由于Kafka存在高容錯、高擴展、分布式等特性,Kafka主要應用場景如下:消息系統日志收集系統
Metrics監控系統Kafka適用場景Node2Zookeeper負責Kafka元數據管理以及Consumer相關數據管理Node1Producers1Producers3Producers2Topic1Topic2Topic3Consumers1Consumers2分區1分區1分區3分區2分區3分區1Broker2分區2Broker2分區1
分區2分區2分區3Broker1分區1分區2分區2分區2Broker1分區1分區3分區1分區3Message(消息):傳遞的數據對象,主要由四部分構成:offset(偏移量)、key、value、timestamp(插入時間)Broker(代理者):Kafka集群中的機器/服務被成為broker,是一個物理概念。Topic(主題):維護Kafka上的消息類型被稱為Topic,是一個邏輯概念。Partition(分區):具體維護Kafka上的消息數據的最小單位,一個Topic可以包含多個分區;Partition特性:ordered&immutable。(在數據的產生和消費過程中,不需要關注數據具體存儲的Partition在那個Broker上,只需要指定Topic即可,由Kafka負責將數據和對應的Partition關聯上)Producer(生產者):負責將數據發送到Kafka對應Topic的進程
Consumer(消費者):負責從對應Topic獲取數據的進程ConsumerGroup(消費者組):每個consumer都屬于一個特定的group組,一個group組可以包含多個consumer,但一個組中只會有一個consumer消費數據。Kafka基本信息術語Kafka是由LinkedIn公司開發的,之后貢獻給Apache基金會,成為Apache的一個頂級項目,開發語言為Scala。提供了各種不同語言的API,具體參考Kafka的cwiki頁面;安裝方式主要由三種,分別是:單機、偽分布式、完全分布式;其中偽分布式和完全分布式基本一樣安裝步驟:安裝JAVA和Scala安裝Zookeeper安裝KafkaKafka安裝介紹下載安裝包、解壓并配置環境變量KAFKA_HOME修改配置文件${KAFKA_HOME}/conf/perties。如果是偽分布式,那么需要在的單臺機器上copy多個perties文件;如果是完全分布式,那么需要將修改好的KAFKA完全copy到其他機器上啟動Kafka服務,啟動命令如下(偽分布式):${KAFKA_HOME}/bin/kafka-server-start.sh
xxx/perties${KAFKA_HOME}/bin/kafka-server-start.sh
xxx/perties關閉服務使用${KAFKA_HOME}/bin/kafka-server-stop.sh進行操作Kafka安裝(偽分布式)Kafka安裝配置項(一)Kafka安裝配置項(二)創建Topic列出Topic查看Topic信息修改Topic啟動Kafka自帶Producer和Consumer進行數據測試Kafka基本操作一個Kafka的Message由一個固定長度的header和一個變長的消息體body組成header部分由一個字節的magic(文件格式)和四個字節的CRC32(用于判斷body消息體是否正常)構成。當magic的值為1的時候,會在magic和crc32之間多一個字節的數據:attributes(保存一些相關屬性,比如是否壓縮、壓縮格式等等);如果magic的值為0,那么不存在attributes屬性body是由N個字節構成的一個消息體,包含了具體的key/value消息備注:每個版本的Kafka消息格式是不一樣的Kafka發送消息格式存儲在磁盤的日志采用不同于Producer發送的消息格式,每個日志文件都是一個
“logentries”序列,每一個logentry包含一個四字節整型數(message長度,值為1+4+N),一個字節的magic,四個字節的CRC32值,最終是N個字節的消息數據。每條消息都有一個當前Partition下唯一的64位offset,指定該消息的起始下標位置,存儲消息格式如下:Kafka
Log消息格式(一)這個“logentries”并非由一個文件構成,而是分成多個segmentfile(日志文件,存儲具體的消息記錄)和一個索引文件(存儲每個segment文件的offset偏移量范
圍)。結構如右圖所示:Kafka
Log消息格式(二)一個Topic分為多個Partition來進行數據管理,一個Partition中的數據是有序、不可變的,使用偏移量(offset)唯一標識一條數據,是一個long類型的數據Partition接收到producer發送過來數據后,會產生一個遞增的offset偏移量數據,同時將數據保存到本地的磁盤文件中(文件內容追加的方式寫入數據);Partition中的數據存活時間超過參數值(log.retention.{ms,minutes,hours},默認7天)的時候進行刪除(默認)Consumer根據offset消費對應Topic的Partition中的數據(也就是每個Consumer消費的每個Topic的Partition都擁有自己的offset偏移量)注意:Kafka的數據消費是順序讀寫的,磁盤的順序讀寫速度(600MB/sec)比隨機讀寫速度(100k/sec)快Kafka消息存儲機制(一)Kafka消息存儲機制(二)一個Topic中的所有數據分布式的存儲在kafka集群的所有機器(broker)上,以分區(partition)的的形式進行數據存儲;每個分區允許存在備份數據/備份分區(存儲在同一kafka集群的其它broker上的分區)每個數據分區在Kafka集群中存在一個broker節點上的分區叫做leader,存儲在其它broker上的備份分區叫做followers;只有leader節點負責該分區的數據讀寫操作,followers節點作為leader節點的熱備節點,從leader節點備份數據;當
leader節點掛掉的時候,followers節點中會有一個節點變成leader節點,重新提供服務Kafka集群的Partition的leader和followers切換依賴ZookeeperKafka分布式機制Kafka集群中由producer負責數據的產生,并發送到對應的Topic;Producer通過push的方式將數據發送到對應Topic的分區Producer發送到Topic的數據是有key/value鍵值對組成的,Kafka根據key的不同的值決定數據發送到不同的Partition,默認采用Hash的機制發送數據到對應
Topic的不同Partition中,配置參數為{partitioner.class}Producer發送數據的方式分為sync(同步)和async(異步)兩種,默認為同步方式,由參數{producer.type}決定;當發送模式為異步發送的時候,Producer提供重
試機制,默認失敗重試發送3次Kafka消息產生/收集機制Kafka有兩種模式消費數據:隊列和發布訂閱;在隊列模式下,一條數據只會發送給customergroup中的一個customer進行消費;在發布訂閱模式下,一條數據會發送給多個customer進行消費Kafka的Customer基于offset對kafka中的數據進行消費,對于一個customergroup中的所有customer共享一個offset偏移量Kafka中通過控制Customer的參數{group.id}來決定kafka是什么數據消費模式,如果所有消費者的該參數值是相同的,那么此時的kafka就是類似于隊列模式,
數據只會發送到一個customer,此時類似于負載均衡;否則就是發布訂閱模式Kafka消息消費機制(一)Kafka的數據是按照分區進行排序的(插入的順序),也就是每個分區中的數據是有序的。在Consumer進行數據消費的時候,也是對分區的數據進行有序的消費的,但是不保證所有數據的有序性(多個分區之間)Consumer
Rebalance:當一個consumer
group組中的消費者數量和對應
Topic的分區數量一致的時候,此時一個Consumer消費一個Partition的數據;如果不一致,那么可能出現一個Consumer消費多個Partition的數據或者不消費數據的情況,這個機制是根據Consumer和Partition的數量動態變化的Consumer通過poll的方式主動從Kafka集群中獲取數據Kafka消息消費機制(二)Kafka消息消費機制(三)Kafka的Replication指的是Partition的復制,一個Partition的所有分區中只有一個分區是leader節點,其它分區是follower節點。Replication對Kafka的吞吐率有一定的影響,但是極大的增強了可用性Follower節點會定時的從leader節點上獲取增量數據,一個活躍的follower節點必須滿足一下兩個條件:所有的節點必須維護和zookeeper的連接(通過zk的heartbeat實現)follower必須能夠及時的將leader上的writing復制過來,不能“落后太多”;“落后太多”由參數{replica.lag.time.max.ms}和{replica.lag.max.messages}決定KafkaReplicationKafka提供了一個in-syncreplicas(ISR)來確保Kafka的Leader選舉,ISR是一個保存分區node的集合,如果一個node宕機了或數據“落后太多”,leader會將該node節點從ISR中移除,只有ISR中的follower節點才有可能成為leader節點Leader節點的切換基于Zookeeper的Watcher機制,當leader節點宕機的時候,其他ISR中的follower節點會競爭的在zk中創建一個文件目錄(只會有一個
follower節點創建成功),創建成功的follower節點成為leader節點Kafka
Leader
ElectionMessageDeliverySemantics是消息系統中數據傳輸的可靠性保證的一個定義,主要分為三種類型:At
most
once(最多一次):消息可能會丟失,但是不可能重復發送
At
least
once(最少一次):消息不可能丟失,但是可能重復發送
Exactly
once(僅僅一次):消息只發送一次,但不存在消息的丟失Kafka的Producer通過參數{request.required.acks}來定義確定Producer和Broker之間是那種消息傳遞類型Kafka的數據是分區存儲的,每個分區中的數據是按照進入kafka的時間進行排序的,這樣不需要為每條數據存儲一個元數據(是否消費),只需要為每個Consumer記錄一個對應分區數據消費的最高標記位,Kafka中叫做“偏移量”(offset)Message
Delivery
Semantics消息集(messageset)二進制傳輸
順序讀取磁盤
“零”拷貝端到端數據壓縮Why
Kafkais
Fast?消息集(message
set):Producer可以將多條消息一次發送給Kafka集群,
Kafka可以一次將所有的數據追加到文件中,減少磁盤零碎的磁盤IO;同時
consumer也可以一次性的請求一個數據集的數據二進制傳輸:同時消息在傳遞過程中是基于二進制進行傳遞的,不需要進行反序列化,在高負載的情況下,對性能是有一定的提升的順序讀寫磁盤:Kafka的所有數據操作都是基于文件操作的,而操作文件的方式都是順序讀寫,而順序讀寫磁盤的速度會比隨機讀寫快6000倍左右Why
Kafkais
Fast?“零”拷貝:在Kafka服務中,數據發送到consumer的過程中采用的是“零拷貝”,比普通的讀寫文件方式減少了兩次操作,速度能夠提高50%端到端數據壓縮:Producer可以需要發送的數據/數據集進行壓縮后發送到
Kafka集群,Kafka集群直接將數據保存到文件,然后Consumer消費數據的時候,將壓縮后的數據獲取到,進行解壓縮操作。在性能瓶頸是網絡帶寬的情況下,非
常有效。默認情況下,kafka支持gzip和snappy壓縮Why
Kafkais
Fast?“零”拷貝Kafka分別提供了基于Java和Scala的API,由于Kafka不僅僅只是在大數據中使用到,所以Kafka的JavaAPI應用的比較多。基于Maven進行Kafka的開發,KafkaMaven依賴如下:Kafka
API<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>${kafka.version}</version></dependency>Kafka
Producer涉及到的配置信息(一)參數名稱默認參數值備注metadata.broker.list指定kafka服務器監聽的主機名和端口號列表,不同服務器之間使用“,”進行分割request.required.acks0指定producer需要等待broker返回數據成功接收標識;
0表示不等待,1表示等待一個broker返回結果,-1表示等待所有broker返回結果request.timeout.ms10000當acks參數配置的時候,指定producer等待連接過期的時間毫米數producer.typesync指定producer發送數據的方式是異步(async)還是同步(sync)serializer.classkafka.serializer.DefaultEncoder指定producer發送數據的時候數據/消息編碼器,即將消息轉換為byte數組的編碼器key.serializer.class指定producer發送數據的時候key類型的數據編碼器,默認使用${serializer.class}給定的值partitioner.classducer.DefaultPartitioner指定producer發送數據的數據分區器,默認采用hash進行數據分區操作;該參數的主要功能是:決定數據到底發送到那一個分區中Kafka
Producer涉及到的配置信息(二)參數名稱默認參數值備注compression.codecnone給定發送數據是否進行壓縮設置,默認不進行壓縮;參數可選:none、gzip、snappymessage.send.max.retries3指定數據發送失敗,重試次數,默認3次retry.backoff.ms100在數據重新發送過程中,producer會刷新topic的元數據信息(leader信息),由于topic元數據的變化需要一點點時間,故該參數指定的值主要用于在producer刷新元數據之前的等待時間erval.ms600000給定producer中topic元數據周期性刷新的間隔時間,默認10分鐘;當該參數給定的值為負數的時候,topic元數據的刷新只有在發送數據失敗后進行刷新;當該參數給定為0的時候,每次發送數據后都進行元數據刷新(不推薦);注意:元數據的刷新是在發送數據后觸發的,如果永遠不發送數據,那么元數據不會被刷新queue.buffering.max.ms5000當數據傳輸方式是async(異步)的時候,指定數據在producer端停留的最長時間,該參數對于數據吞吐量有一定的影響,當時會增加數據的延遲性queue.buffering.max.messages10000當數據傳輸方式為async(異步)的時候,指定producer端最多允許臨時保存的最大數據量,當數據量超過該值的時候,發送一次數據Kafka
Producer涉及到的配置信息(三)參數名稱默認參數值備注queue.enqueue.timeout.ms-1當數據發送方式為async(異步),而且等待隊列數據填充滿的時候{queue.buffering.max.messages},一條新的數據過來,最大阻塞時間;設置為0表示,不阻塞,當隊列滿的時候,直接將新數據刪除(不發送);當設置為正數的時候,表示等待給定毫秒數后,進行重試操作,失敗則數據刪除(不發送);設置為-1表示一直等待,直到隊列允許添加數據batch.num.messages200當數據發送方式為async(異步)的時候,producer一個批次發送的數據條數;當producer中的數據量達到該參數${batch.num.messages}的設置值或者數據停留時間超過參數${queue.buffering.max.ms}的時候,觸發producer發送數據的動作(實際發送數據量可能不超過該參數值)send.buffer.bytes102400指定producer端數據緩存區大小,默認值為:10KBKafka
Producer開發參考頁面:
/082/documentation.html#producerapi
/081/documentation.html#producerconfigs
/081/documentation.html#apidesign
/081/documentation.html#producerapiKafka的Producer
API主要提供下列三個方法:public
void
send(KeyedMessage<K,V>
message)發送單條數據到Kafka集群public
void
send(List<KeyedMessage<K,V>>
messages)發送多條數據(數據集)到Kafka集群public
voidclose()關閉Kafka連接資源案例:使用Java語言實現一個Kafka
Producer程序并測試Kafka
Producer
APIKafka
Consumer涉及到的配置信息(一)參數名稱默認參數值備注group.idConsumer的groupid值,如果多個Consumer的groupid的值一樣,那么表示這多個Consumer屬于同一個group組zookeeper.connectKafka元數據Zookeeper存儲的url,和配置文件中的參數一樣consumer.id消費者id字符串,如果不給定的話,默認自動產生一個隨機idsocket.timeout.ms30000Consumer連接超時時間,實際超時時間是socket.timeout.ms+
max.fetch.waitsocket.receive.buffer.bytes65536接收數據的緩沖區大小,默認64kbfetch.message.max.bytes1048576指定每個分區每次獲取數據的最大字節數,一般該參數要求比
message允許的最大字節數要大,否則可能出現producer產生的數據consumer沒法消費num.consumer.fetchers1Consumer獲取數據的線程數量mit.enabletrue是否自動提交offset偏移量,默認為true(自動提交)erval.ms60000自動提交offset偏移量的間隔時間Kafka
Consumer涉及到的配置信息(二)參數名稱默認參數值備注rebalance.max.retries4當一個新的Consumer添加到Consumer
Group的時候,會觸發數據消費的
rebalance操作;rebalance操作可能會失敗,該參數的主要作用是設置
rebalance的最大重試次數fetch.min.bytes1一個請求最少返回記錄大小,當一個請求中的返回數據大小達到該參數的設置值后,記錄數據返回到consumer中fetch.wait.max.ms100一個請求等待數據返回的最大停留時間rebalance.backoff.ms2000rebalance重試過程中的間隔時間auto.offset.resetlargest指定consumer消費kafka數據的時候offset初始值是啥,可選參數:largest和smallest;smallest指該consumer的消費offset是當前kafka數據中的最小偏移量;largest指該consumer的消費offset是當前kafka數據中的最大偏移量consumer.timeout.ms-1給定當consumer多久時間沒有消費數據后,拋出異常;-1表示不拋出異常zookeeper.session.timeout.ms6000zk會話時間zookeeper.connection.timeout.ms6000連接zk過期時間Kafka提供了兩種Consumer
API,分別是:High
Level
Consumer
API
和Lower
Level
Consumer
API(Simple
Consumer
API)High
Level
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 學校日常微管理制度
- 學校運動區管理制度
- 學生接送車管理制度
- 孵化廠銷售管理制度
- 安全及消防管理制度
- 安全運行與管理制度
- 實名制入井管理制度
- 實驗室培訓管理制度
- 客戶為中心管理制度
- 宣講員聘用管理制度
- 學校(幼兒園)每周食品安全排查治理報告(整學期16篇)
- 延期交房起訴狀開發商違約金起訴狀
- 心內科用藥安全管理課件
- GB/T 20453-2022柿子產品質量等級
- 贛美2011版三年級美術下冊《瓜果飄香》教案及教學反思
- 維修改造工程施工組織設計
- 執行力案例分享與解析課件
- 電路理論知到章節答案智慧樹2023年同濟大學
- 新版心肺復蘇流程圖
- 與食品安全相關的組織機構設置、部門職能和崗位職責
- 法院送達地址確認書
評論
0/150
提交評論