




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、 Apache Kafka-大數(shù)據(jù)實(shí)時(shí)處理平臺(tái)介紹InfoQ 微信號(hào) infoqchina功能介紹 有內(nèi)容的技術(shù)社區(qū)媒體對(duì)于 Apache Kafka 的使用范疇已經(jīng)遠(yuǎn)不僅是分布式的消息系統(tǒng):我們可以將每一次用戶點(diǎn)擊,每一個(gè)數(shù)據(jù)庫(kù)更改,每一條日志的生成,都轉(zhuǎn)化成實(shí)時(shí)的結(jié)構(gòu)化數(shù)據(jù)流,更早的存儲(chǔ)和分析它們,并從中獲得價(jià)值。同時(shí),越來(lái)越多的企業(yè)應(yīng)用也開始從批處理數(shù)據(jù)平臺(tái)向?qū)崟r(shí)的流數(shù)據(jù)數(shù)據(jù)平臺(tái)轉(zhuǎn)移。本演講將介紹最近 Apache Kafka 添加的一些系統(tǒng)架構(gòu),包括 Kafka Connect 和 Kafka Streams,并且描述一些如何使用它們的實(shí)際應(yīng)用體驗(yàn)。 流處理 在流處理剛被提出來(lái)的時(shí)候,
2、很多人認(rèn)為流處理只能進(jìn)行做近似的結(jié)果或者增量的計(jì)算,倘若你想保證其安全性,以 Lamda 架構(gòu)為基礎(chǔ),利用流處理得到最現(xiàn)在的結(jié)果。但同時(shí)你需要采用 batch processing 等其他方式來(lái)保證其全局的安全性以正確性。在如此多年的研究結(jié)果下,在我看來(lái),流處理并不一定是近似的,或者是僅僅以無(wú)法保證真確性為代價(jià)而提高速度的一種數(shù)據(jù)處理方式。相反,流處理應(yīng)該是一個(gè)與全局計(jì)算、batch processing 稍微有點(diǎn)不同的計(jì)算模型。跟批量處理不同之處在于,批量處理將數(shù)據(jù)引向計(jì)算,而流處理將計(jì)算引向數(shù)據(jù)。這句話大概有點(diǎn)模糊,接下來(lái),我舉幾個(gè)大家熟悉的計(jì)算模型例子。第一個(gè)計(jì)算模型例子請(qǐng)求應(yīng)答模型。請(qǐng)
3、求應(yīng)答模型是業(yè)務(wù)生活中最常用的模型例子。首先提交一個(gè)請(qǐng)求到服務(wù)方,而服務(wù)方可能是一個(gè)數(shù)據(jù)庫(kù)、也可能是別的存儲(chǔ)工具;然后進(jìn)行等待等待;最后得到一個(gè)回答。這便是一次請(qǐng)求、一次計(jì)算、一次回答。該模型非常簡(jiǎn)單、也極易操作,當(dāng)你需要延展到多個(gè)機(jī)器上時(shí),只要簡(jiǎn)單地增加客戶端以及處理器即可成功。但是缺點(diǎn)在于,不能達(dá)到大的吞吐量,每提交一次請(qǐng)求,都需要等待時(shí)間來(lái)獲得最終應(yīng)答的結(jié)果。第二種常見的模型就是批量處理如上圖所示。如果請(qǐng)求應(yīng)答模型在譜系的一端,那么 typo 的另一端則認(rèn)為是批量處理。當(dāng)我積累數(shù)據(jù)數(shù)量足夠多的時(shí)候,一次性提交任務(wù)到數(shù)據(jù)倉(cāng)庫(kù),再進(jìn)行等待,等待時(shí)間短則幾秒鐘、幾分鐘,長(zhǎng)則幾小時(shí),最后才得到最
4、終的結(jié)果所有輸入對(duì)應(yīng)的所有輸出。該批處理模型的好處在于能夠提高其吞吐率,一次的請(qǐng)求和應(yīng)答可以得出較多結(jié)果。但它的缺點(diǎn)是具有高延時(shí)性,比如某數(shù)據(jù)產(chǎn)生時(shí)間為上午 6 點(diǎn)鐘,用戶點(diǎn)擊某網(wǎng)頁(yè),由于批處理模型,每 12 小時(shí)才會(huì)運(yùn)行一次,那么它必須等到上午 6 點(diǎn)到下午 6 點(diǎn)的所有數(shù)據(jù)完整以后才會(huì)進(jìn)行工作,那么運(yùn)行結(jié)果可能是用戶點(diǎn)擊的 12 個(gè)小時(shí)之后。高延遲性是批處理自身帶有的特性。那么什么是流處理呢? 在我看來(lái),流處理就是介于請(qǐng)求應(yīng)答和批處理之間的一種新型計(jì)算模型或者編程模型。流處理并不等待數(shù)據(jù)的完整性,或者說數(shù)據(jù)本沒有完整性這一講法,數(shù)據(jù)本身就是一個(gè)數(shù)據(jù)流,當(dāng)每個(gè)數(shù)據(jù)流每產(chǎn)生一個(gè)新數(shù)據(jù)的時(shí)候立刻
5、被計(jì)算出、進(jìn)行返回,因此數(shù)據(jù)是源源不斷地通向計(jì)算,并且源源不斷有結(jié)果被輸出。你可以設(shè)想,與等待數(shù)據(jù)完全完成之后發(fā)布到計(jì)算上相比,流處理就是將計(jì)算移到你數(shù)據(jù)發(fā)生地進(jìn)行實(shí)時(shí)計(jì)算的方式。為什么很多人之前有這樣一種錯(cuò)覺,他們認(rèn)為流處理可能存在有丟包的情況、或者說只可以得到近似的結(jié)果,其實(shí)這是早期的一些數(shù)據(jù)流處理系統(tǒng)所自帶的一些限制。因此以 Lamda 架構(gòu)為基礎(chǔ),在流處理上需要討論不同維度的取舍。接下里我將舉三個(gè)例子,延遲、成本和正確性。正如很多人之前提及的,在進(jìn)行流處理時(shí)候,其大多數(shù)情況需要用時(shí)間來(lái)?yè)Q取正確性,或者用更多的成本換取時(shí)間等等。第一個(gè)例子,說如果你需要做一個(gè)實(shí)時(shí)的 ETL 處理。而關(guān)于
6、ETL 處理不需要太小的延遲,為達(dá)到低成本的一種保證,我們可以忍受幾分鐘或者 1 分鐘的延遲;但是,如果你正在進(jìn)行一個(gè)實(shí)時(shí)的在線監(jiān)測(cè),存在著幾毫秒的延遲,那么這時(shí)候可能更愿意選擇花大量的金錢,或者采取一些可能不必要的 possibility 來(lái)達(dá)到一種低延遲的效果;第二個(gè)例子,假設(shè)你在做一個(gè)在線付費(fèi)協(xié)議,它也是一個(gè)流處理平臺(tái)。由于在線付費(fèi)協(xié)議可能關(guān)乎到其機(jī)構(gòu),或者其公司的利益所在,因此你會(huì)說,我需要保證百分之百的正確性,我不希望有任何丟包情況;第三個(gè)例子,如果你是做一個(gè)實(shí)時(shí)的日志處理,實(shí)時(shí)收集所有日志,并將其導(dǎo)入 root,在這種情況下,你可能會(huì)說,為了降低成本,我愿意付出一小部分正確性的代價(jià)
7、,即使不能達(dá)到 100%、達(dá)到 99.99%、達(dá)到 99.9%,這樣的結(jié)果都可以接受。這本是用戶在定義不同流處理應(yīng)用或者業(yè)務(wù)的時(shí)候應(yīng)該可以自己做出的選擇。但比較遺憾的是,多數(shù)早期的流處理平臺(tái)其實(shí)并沒有給予用戶該種選擇,他們自身的設(shè)計(jì)理念,那就是為了低延遲直接放棄掉正確性,或者說為了更高的吞吐量直接放棄低延遲。以上是我想分享的關(guān)于流處理的一些誤會(huì)認(rèn)知,如果我的分享能夠讓大家?guī)ё邇蓚€(gè)答案的話,我希望這就是一個(gè)。我認(rèn)為流處理僅僅是一種不一樣的計(jì)算模型或者編程模型,它將計(jì)算帶到數(shù)據(jù)上,而不是將數(shù)據(jù)引用到計(jì)算上,并且在流處理的時(shí)候,用戶往往需要在正確性、延遲性、成本等不同的維度上做出選擇。Kafka 的
8、角色 為什么當(dāng)我們說到流處理的時(shí)候,很多人都在說 Kafka。大多數(shù)人在最早接觸 Kafka 時(shí)會(huì)說,Kafka 就是一個(gè)分布式發(fā)布訂閱的消息系統(tǒng),但是如果我們?nèi)ビ^察 Kafka 的最初一些設(shè)計(jì)特性可發(fā)現(xiàn)以下幾點(diǎn)內(nèi)容。第一點(diǎn),它可以作為一個(gè)寫在磁盤上的緩存來(lái)使用,或者說,并不是僅基于內(nèi)存來(lái)存儲(chǔ)流數(shù)據(jù),它可以保證數(shù)據(jù)包不被及時(shí)消費(fèi)時(shí),依然可用且不被丟失;第二點(diǎn),由于位移的存在提供了邏輯上的順序,在同一個(gè)話題上,第一個(gè)數(shù)據(jù)比第二個(gè)數(shù)據(jù)最先被發(fā)布的時(shí)候,也可保證在消費(fèi)時(shí)也是永遠(yuǎn)第一個(gè)數(shù)據(jù)比第二個(gè)數(shù)據(jù)先被消費(fèi);第三點(diǎn),因?yàn)?Kafka 是一個(gè)公有的大數(shù)據(jù)中轉(zhuǎn)站,就是說,所有的數(shù)據(jù)只要在 Kafka 上
9、,永遠(yuǎn)可以在 Kafka 周圍進(jìn)行業(yè)務(wù)的開發(fā)或者認(rèn)知事物的開發(fā)。接下來(lái)我將花費(fèi)一些時(shí)間詳細(xì)介紹這三點(diǎn)之間的關(guān)系。Kafka 不僅僅是一個(gè)訂閱消息系統(tǒng),同時(shí)也是一個(gè)大規(guī)模的流數(shù)據(jù)平臺(tái),那么它提供了什么呢?第一,提供訂閱和發(fā)布消息;第二,提供一個(gè)緩存的流數(shù)據(jù)存儲(chǔ)平臺(tái);第三,提供流數(shù)據(jù)的處理平臺(tái)。今天,我將著重討論流式計(jì)算在 Kafka 上面的應(yīng)用。流式計(jì)算在 Kafka 上的應(yīng)用主要有哪些選項(xiàng)呢?第一個(gè)選項(xiàng)就是 DIY,Kafka 提供了兩個(gè)客戶端 一個(gè)簡(jiǎn)單的發(fā)布者和一個(gè)簡(jiǎn)單的消費(fèi)者,我們可以使用這兩個(gè)客戶端進(jìn)行簡(jiǎn)單的流處理操作。舉個(gè)簡(jiǎn)單的例子,利用消息消費(fèi)者來(lái)實(shí)時(shí)消費(fèi)數(shù)據(jù),每當(dāng)?shù)玫叫碌南M(fèi)數(shù)據(jù)時(shí)
10、,可做一些計(jì)算的結(jié)果,再通過數(shù)據(jù)發(fā)布者發(fā)布到 Kafka 上,或者將它存儲(chǔ)到第三方存儲(chǔ)系統(tǒng)中。DIY 的流處理需要成本。打個(gè)比方,考慮數(shù)據(jù)的延遲性,考慮不同時(shí)間上的管理分配,正如很多人提到的 processing time,這將是我后文會(huì)重點(diǎn)提及的概念。以上這些都說明,利用 DIY 做流處理任務(wù)、或者做流處理業(yè)務(wù)的應(yīng)用都不是非常簡(jiǎn)單的一件事情。第二個(gè)選項(xiàng)是進(jìn)行開源、閉源的流處理平臺(tái)。比如,spark。關(guān)于流處理平臺(tái)的一個(gè)公有認(rèn)知的表示是,如果你想進(jìn)行流處理操作,首先拿出一個(gè)集群,且該集群包含所有必需內(nèi)容,比如,如果你要用 spark,那么必須用 spark 的 runtime。因?yàn)樗麄儎澏四?/p>
11、作為一個(gè)流處理平臺(tái)使用者需要用到的所有行為,比如,資源管理系統(tǒng)、參數(shù)調(diào)配系統(tǒng)、容器配置、代碼封裝、分發(fā)等,以上行為都已被該平臺(tái)所限定。一旦你選擇使用甲就必須用甲套餐裝備,如果選擇使用乙就必須使用乙套餐裝備。有人不禁提出疑問,我能不能既選擇流處理平臺(tái),又使用自己選擇的,我能不能這樣做呢?這個(gè)應(yīng)用場(chǎng)景其實(shí)很普遍,舉個(gè)例子,可異步式微服務(wù)處理。什么叫異步式微服務(wù)處理?假設(shè) Kafka 作為一個(gè)緩存數(shù)據(jù),在該緩存區(qū)含有很多不同的業(yè)務(wù)。打個(gè)比方,一個(gè)網(wǎng)店的機(jī)構(gòu)可以有不同的組、不同的員工,有人負(fù)責(zé)銷售、有人負(fù)責(zé)商品分發(fā),有人負(fù)責(zé)價(jià)格管理、有人負(fù)責(zé)在線實(shí)時(shí)的限流監(jiān)控,不同的組、不同的員工可能會(huì)以不同的時(shí)間,
12、或者以不同的代碼來(lái)更新他們的產(chǎn)品,只要擁有一個(gè)異步式緩存機(jī)制,即 Kafka,便可擴(kuò)大該微服務(wù),而不需要他們的任何一個(gè)組之間進(jìn)行同步請(qǐng)求應(yīng)答機(jī)制。在該微服務(wù)情況下,每個(gè)小組的喜好、特性并不一致,有的組表示我需要做流處理平臺(tái),從 Kafka 讀數(shù)據(jù),處理完再寫回 Kafka,并且想要使用 EWS 把我的應(yīng)用部署在云端大規(guī)模集群上;而另外小組表示我不需要那么復(fù)雜,我只是小規(guī)模數(shù)據(jù),不希望起一個(gè)集群,只需起三個(gè)機(jī)器,并且每個(gè)機(jī)器有 1GB 內(nèi)存足以,可進(jìn)行手動(dòng)控制操作,不需要資源管理器。那么我們能不能同時(shí)滿足他們不同的需求呢? 答案就是我接下來(lái)要說的第三種選項(xiàng)。第三種選項(xiàng)是使用一個(gè)輕量級(jí)流處理的庫(kù),
13、而不需要使用一個(gè)廣泛、復(fù)雜的框架或者平臺(tái)來(lái)滿足他們不同的需求。在 Kafka 0.10 當(dāng)中已發(fā)布輕量級(jí)流處理內(nèi)容平臺(tái),我們可以設(shè)想,跟其他客戶端發(fā)布者和消費(fèi)者一樣,它也是一個(gè)客戶端,不同之處在于它是一個(gè)計(jì)算者客戶端,一個(gè)好用的、功能強(qiáng)大的客戶端,并且支持 state processing、Windows 延時(shí)的、異步的、甚至不同數(shù)據(jù)的調(diào)控。 最重要的是 Kafka 作為一個(gè)庫(kù),可以采用多種方法來(lái)發(fā)布流處理平臺(tái)的使用。比如,你可以構(gòu)建一個(gè)集群;你可以把它作為一個(gè)手提電腦來(lái)使用;甚至還可以在黑莓上運(yùn)行 Kafka。以上都是尤其簡(jiǎn)單的運(yùn)行庫(kù)的概念。因此我們要做的事情與使用 Kafka 其他的客戶端
14、類似,比如發(fā)布者、消費(fèi)者,只要在代碼里邊加入就可以使用各種各樣的 API。當(dāng)你要調(diào)配控制 Kafka Stream 應(yīng)用的時(shí)候,選擇最基礎(chǔ)的 War File 來(lái)運(yùn)行或者采用 Java、C,甚至資源管理器來(lái)運(yùn)行都是可行的。因?yàn)?Kafka Stream 是一個(gè)輕量級(jí)流處理的庫(kù),可支持各種各樣的運(yùn)維方式。在我們看來(lái),簡(jiǎn)單的就是美的,只有給用戶提供最大的兼容性與最大的延展性,用戶才能得到最好的用戶體驗(yàn)。Kafka Stream 的編程語(yǔ)言 如果接觸過 Storm、Spark 等流處理平臺(tái)的同學(xué)可以發(fā)現(xiàn),它們與 Kafka Stream 高階位 DSL 語(yǔ)言其實(shí)有相似之處。如上圖所示,首先定義一個(gè)
15、Streams 流, Streams 是從 topic1 中的 topic 獲取得到,即定義 Streams、處理 Streams、得到新的 Streams。比如,從 topic1 里面得到兩個(gè)原始數(shù)據(jù)流,然后數(shù)據(jù)流進(jìn)行 countByKey 得到新的數(shù)據(jù)流叫做 Counts。那么 counts.to(“topic2”) 是什么意思呢?在獲取到新的數(shù)據(jù)流之后寫回 Kafka topic2 內(nèi),啟動(dòng) KafkaStreams 進(jìn)程,與 Kafka producer、Kafka consumer 類似,讓它來(lái)運(yùn)行已定義計(jì)算。正如大家所了解的,API 的使用其實(shí)很簡(jiǎn)單。提供一個(gè)簡(jiǎn)單的 API,用戶簡(jiǎn)
16、單地寫入運(yùn)行邏輯即可運(yùn)行。但是編程應(yīng)用總是容易的,而它的復(fù)雜程度在于,一旦你開始運(yùn)維該應(yīng)用,當(dāng)你想要把業(yè)務(wù)拓展到更大規(guī)模,或者業(yè)務(wù)出現(xiàn)變化,或者集群不穩(wěn)定,需要強(qiáng)大的運(yùn)維時(shí),運(yùn)維的程度便顯得異常重要,最上面的編程可能只是冰山一角。Kafka Stream 的設(shè)計(jì)理念是最簡(jiǎn)單的就是最美的,包括 API、運(yùn)維、debugging,以及各種各樣的方式,都是希望給用戶帶來(lái)最簡(jiǎn)單的體驗(yàn)。它的核心思想就是把難問題直接給 Kafka 集群本身。Kafka 的介紹 Kafka 的核心思想是什么?就是把這些消息全部存成一個(gè)有序日志,所有的消息發(fā)布者把消息發(fā)布到底端,從某一個(gè)邏輯上的位移開始順序讀取所有的消息。它
17、的一個(gè)好處在于所有的讀和寫,盡管都是刷到磁盤上,但都是按照順序進(jìn)行,該方式對(duì)磁盤的使用比較有效,倘若消費(fèi)者和發(fā)布者隔得比較近,將利用 page cash 直接讀數(shù)據(jù)。延展性。如上圖,提供 topic 以及 topic partitions,即話題與話題分區(qū)的機(jī)制。每個(gè)用戶有不同的 topic,每個(gè) topic 可以有多個(gè)分區(qū),每個(gè)分區(qū)可被裝載在不同的機(jī)器上,當(dāng)用戶提高規(guī)模之后,Kafka 只需要簡(jiǎn)單地增加機(jī)器和 topic partitions 數(shù)量,或者采用 ROM balance 的方式到不同機(jī)器上,即可達(dá)到線性延展方式。以上是 Kafka 最簡(jiǎn)單的核心思想,接下來(lái)我將介紹 Kafka S
18、treams 作為 Kafka 客戶端如何利用以上核心思想來(lái)設(shè)計(jì)流處理的平臺(tái)。數(shù)據(jù)流其實(shí)就是有序的記錄或消息,每個(gè)消息是一個(gè) Key 加一個(gè) Value,并且 record 與 Kafka 自身 massage 具有一一對(duì)應(yīng)關(guān)系。用戶所提供的業(yè)務(wù)上的計(jì)算模型,其實(shí)可用拓補(bǔ)結(jié)構(gòu)進(jìn)行表達(dá)。如上圖,圖的左邊。用戶首先進(jìn)行定義數(shù)據(jù)流,然后對(duì)數(shù)據(jù)流進(jìn)行計(jì)算,得到新的數(shù)據(jù)流,最終將數(shù)據(jù)流寫回到 Kafka 內(nèi)。每當(dāng)用戶進(jìn)行定義的時(shí)候,每一步都會(huì)變成拓?fù)浣Y(jié)構(gòu)里面的一個(gè)點(diǎn),每個(gè)點(diǎn)通過流進(jìn)行計(jì)算,變成新的流來(lái)進(jìn)行新的連接,最終在 Kafka 內(nèi)部形成拓?fù)浣Y(jié)構(gòu)。用戶并不需要在意該拓補(bǔ)結(jié)構(gòu),只需明白定義流、計(jì)算流、
19、得到新的流,寫回 Kafka。連接每一個(gè)不同的運(yùn)算單元就是一個(gè) Stream,即 record stream,每一個(gè) Stream 都在源源不斷地實(shí)時(shí)產(chǎn)生 record,每一個(gè) record 是一個(gè) key 加一個(gè) value。利用 Stream Processor 連接 Stream,每個(gè)用戶定義的流的一個(gè)計(jì)算單位對(duì)應(yīng)著一個(gè) Stream Processor。當(dāng)用戶定義每一步計(jì)算的時(shí)候,就是定義每個(gè)拓?fù)浣Y(jié)構(gòu)里面的每個(gè)點(diǎn),最終把整個(gè)拓補(bǔ)結(jié)構(gòu)定義完整到 Kafka Stream 來(lái)運(yùn)行。計(jì)算單元其實(shí)可分成兩個(gè)特殊的單元,一個(gè)叫做元的計(jì)算單元,只有輸出流,沒有輸入流,它們唯一的認(rèn)同就是從 Kafk
20、a 讀取數(shù)據(jù)形成數(shù)據(jù)流,傳遞給下方其他數(shù)據(jù)處理。而 Stream Processor 底端的數(shù)據(jù)流,沒有輸出流,只有輸入流,它們的功能是把所有輸入流寫回到 Kafka。Kafka 的運(yùn)行操作簡(jiǎn)單,源數(shù)據(jù)從 Kafka log 讀取消息變成數(shù)據(jù)流,每個(gè)消息貫穿整個(gè)拓?fù)浣Y(jié)構(gòu),最終從 Stream Processor 寫回到 Kafka。以上為 Kafka Stream 運(yùn)行情況。用戶進(jìn)行并行發(fā)布進(jìn)程、應(yīng)用或者多個(gè)計(jì)算的操作其實(shí)也非常簡(jiǎn)單。Kafka 是一個(gè)庫(kù),當(dāng)你用 Kafka 庫(kù)寫成應(yīng)用,當(dāng) record 寫入多臺(tái)機(jī)器時(shí),Kafka Stream 庫(kù)本身就會(huì)自動(dòng)調(diào)動(dòng) partitions 方式,
21、假設(shè)你有兩臺(tái)機(jī)器,每臺(tái)機(jī)器上都運(yùn)行了 Kafka Streams,當(dāng)它同時(shí)進(jìn)行運(yùn)行時(shí),不同的 streams application instance 就會(huì)從不同的 Kafka partitions 內(nèi)讀取數(shù)據(jù)來(lái)達(dá)到并行任務(wù)的分發(fā)與執(zhí)行,任務(wù)之間沒有任何的數(shù)據(jù)重疊,當(dāng)你需要更多線性地增長(zhǎng)任務(wù)時(shí),你只需要在不同的機(jī)器上運(yùn)行同樣的 record,所有的 instance 將會(huì)自動(dòng)進(jìn)行 rebalance,把新的 application 寫入,然后獲取到延展。很多人看到不同的計(jì)算方式的時(shí)候會(huì)發(fā)現(xiàn),有的計(jì)算方式,比如說 fliter、map,沒有“計(jì)算狀態(tài)”需要保存,一個(gè)數(shù)據(jù)進(jìn)來(lái)計(jì)算、一個(gè)數(shù)據(jù)出去。但
22、是有的計(jì)算,比如說 join、aggregate,就需要?jiǎng)討B(tài)維護(hù)一個(gè)“計(jì)算狀態(tài)”,每一次新的信息或者日志進(jìn)來(lái)的時(shí)候, Stream 就要進(jìn)行更新甚至進(jìn)行讀取。后者被稱為 Stateful Processing,前者為 Stateless Processing。那么如何進(jìn)行管理流處理的 states 呢?有兩個(gè)通用的方式,一個(gè)方式是 remote State,利用遠(yuǎn)程的數(shù)據(jù)庫(kù)或者遠(yuǎn)程的 key value store 存儲(chǔ)所有流處理的 states,每一次計(jì)算的時(shí)候,發(fā)送一個(gè)遠(yuǎn)程請(qǐng)求來(lái)讀取 states。遠(yuǎn)程請(qǐng)求的缺點(diǎn)在于需要進(jìn)行遠(yuǎn)程的請(qǐng)求和應(yīng)答。因?yàn)?states 存在于 Remove Sta
23、te 上,states 之間可能會(huì)有 overlation,不能很好做到 accesstion. 比如我是團(tuán)隊(duì) A,只負(fù)責(zé) sell,另外一個(gè)是團(tuán)隊(duì) B,只負(fù)責(zé) ajustment, 兩個(gè)不同的流有著不同的 job,但是 state 存在一起,所以兩者會(huì)相互影響;另外一個(gè)方式是 Local State,意味著所有的 state 和所有的處理單元是并發(fā)在一起的,每個(gè)單元上存著 state。在 Kafka Stream 里面,每個(gè)計(jì)算單元之間不需要有任何交互,state 之間亦如此。我們只要把 state 存到 Local 計(jì)算單元上就足矣。第一,可以保證 better isolation,它們之
24、間沒有任何的 access;第二,local state 可以做到更好的時(shí)效性,不需要遠(yuǎn)程讀寫。如上圖,在 Kafka 內(nèi)有 aggregateByKey()語(yǔ)句,類似于 Stateful Processing。當(dāng)用戶定義 Stateful Processing 的時(shí)候,在 Kafka Stream 庫(kù)內(nèi)部就會(huì)自動(dòng)生成 State Strom,且與 aggregate opprate 進(jìn)行連接,只有該 opprate 能夠?qū)υ?State Strom 進(jìn)行讀寫,因?yàn)槊總€(gè) opprate 有自己獨(dú)有的 State Strom,可達(dá)到 State Strom 完全 Local 化。當(dāng)我們有多個(gè)并發(fā)
25、流處理任務(wù)的時(shí)候,每個(gè)計(jì)算單元除了有一個(gè)自己的拓?fù)浣Y(jié)構(gòu)進(jìn)行計(jì)算之外,也有一份 State Store。每個(gè) State Strom 之間是存儲(chǔ)完全不相干的流處理信息和數(shù)據(jù)。接下來(lái)討論的是 Kafka Streams 里面另一個(gè)重要概念,流與數(shù)據(jù)庫(kù)表的關(guān)系?正如大家所看見的,在 Kafka Streams 內(nèi)部有兩種流 KStream 與 Ktable,那么什么叫做 KStream?什么叫做 Ktable 呢?在開發(fā) Kafka Streams 時(shí)的一個(gè)核心出發(fā)點(diǎn)是流和它所對(duì)應(yīng)的表或者數(shù)據(jù)庫(kù)的 State 彼此之間具有一一影射關(guān)系。為什么一一影射呢?舉個(gè)例子,假設(shè)你有一個(gè)上圖的數(shù)據(jù)流,該數(shù)據(jù)流代
26、表著某張表,即變量的日志或者更新日志。更新日志內(nèi)含有 Key 和 Valve,比如第三條的更新日志(key1,value3)其實(shí)正在更新第 1 日志(key1,value1)的新信息,換句話說,原本 key1 所對(duì)應(yīng)的是 value1,但是在這一時(shí)刻被改成對(duì)應(yīng) value3,如果我們重復(fù)更新該日志,我們能夠得到什么呢?我們可以得到該表在任意時(shí)間段內(nèi)的一個(gè)實(shí)時(shí)的可視化圖。同理,如果我們只有這樣一個(gè)表,并且正在不斷更新這個(gè)表,只要在每次更新時(shí)保留該日志,就能夠從表反推回該更新日志的數(shù)據(jù)流所應(yīng)的所有內(nèi)容,這就是流和表或者流和狀態(tài)之間的一一對(duì)應(yīng)關(guān)系??偠灾?,只要你有一個(gè)日志更新流,即可重構(gòu)回你表狀態(tài)
27、在任意時(shí)間內(nèi)的 value;如果你有一個(gè)表,也可以通過表的更新來(lái)找到該表所對(duì)應(yīng)的流。這就是我所說的 A Stream is a changelog of a table ;A table is a materialized view at tiome of a stream. 流和表具有對(duì)應(yīng)關(guān)系。這促使我們定義兩種不同的KStream 和 KTable。KStream 是很普通的數(shù)據(jù)流,在數(shù)據(jù)流之間不存在任何因果關(guān)系和邏輯關(guān)系,可以被認(rèn)為是 append only Stream。Typo 是更新日志流,每個(gè)日志里面相同的 key 所對(duì)應(yīng)的就是對(duì)表的更新。那么為什么要定義這兩種不同的數(shù)據(jù)流呢?我
28、舉個(gè)例子。如上圖,用戶購(gòu)買歷史記錄。比如 Alice 曾經(jīng)買過雞蛋和牛奶,雞蛋和牛奶這兩者之間不存在任何因果關(guān)系,Alice 買過牛奶只是在 Alice 買過雞蛋上很簡(jiǎn)單的增量。用戶雇傭狀態(tài)的更新日志,比如 Alice 曾經(jīng)在 LinkedIn 工作,之后信息被更新到 Alice 在微軟工作,現(xiàn)在 Alice 在微軟工作覆蓋了之前的工作信息。如果以當(dāng)前的時(shí)間狀態(tài)進(jìn)行解讀這兩個(gè)流,第一個(gè)流顯示的信息為 Alice 曾經(jīng)買過雞蛋,第二個(gè)流信息顯示為 Alice 在 LinkedIn 工作。如果將時(shí)間往前推,查看更新的數(shù)據(jù)流信息可以發(fā)現(xiàn),第一個(gè) KStream 顯示 Alice 買了雞蛋又買了牛奶;
29、但是在第二種情況下,Alice 并不是同時(shí)在 LinkedIn 和微軟工作,而是 Alice 已經(jīng)在微軟工作,不在 LinkedIn 工作了。為什么兩種不同的流有兩種定義呢?因?yàn)楫?dāng)你做相同操作的時(shí)候,比方說簡(jiǎn)單做一個(gè)合計(jì)操作,不同的流得出的結(jié)果是不一樣的。在上者,如果我們將時(shí)間往前推,可得出 Alice 的合計(jì)結(jié)果是 2+3;但是在下面,如果對(duì)其進(jìn)行 KTable 的 aggregate,顯示 Alice 的結(jié)果是將其原本數(shù)值 2 變成 3,而不是 +3 的關(guān)系。在 Kafka Stream 的 DSL 里面有多種不同的 aggregate,reduce 操作等, 不同的數(shù)據(jù)流可能將 KStr
30、eam 變成 KTable,也可能把 KTable 變回 KStream,在用戶定義如下不同的 operation 的時(shí)候,在后臺(tái)不同狀態(tài)的流可采用不同計(jì)算方式、計(jì)算模型。如上圖,KTable。當(dāng)一條新消息進(jìn)來(lái)時(shí)該如何進(jìn)行拓?fù)溆?jì)算呢?舉個(gè)例子,在該拓?fù)浣Y(jié)構(gòu)內(nèi),Stream2 出現(xiàn)了一個(gè)新的 record,即紅顏色標(biāo)記,該標(biāo)記與第一條 record 顏色相近,因?yàn)樗鼈兪峭瑐€(gè) key,不同 value。Stream2 和 Stream1 進(jìn)行 join 操作成為一個(gè)新的 record,該新 record 會(huì)被放入到 KStream joined 里面,然后 KStream joined 進(jìn)行 ag
31、gregate 操作,而 aggregate 操作得到的結(jié)果是 state 被更新,新 record 被 append 到 aggregate 流內(nèi),但是 append 操作將之前的紅顏色 record 復(fù)寫了,換句話說,因?yàn)橛辛嗽撔?record 的存在,之前紅顏色的 record 由于被復(fù)寫已經(jīng)不重要了。Kafka Stream 運(yùn)維 如果我們有一個(gè) fault,那么我們?nèi)绾卧?Kafka Stream 上做 fault tolerance?正如上文所提及的,Tables 和 Stream 之間存在一一影射關(guān)系,Kafka Stream 有效地利用了該特性。舉個(gè)例子,有個(gè) Kafka St
32、ream 的應(yīng)用業(yè)務(wù),該業(yè)務(wù)有三個(gè)并發(fā) task,每個(gè) task 有自己的 local state,每當(dāng) State 進(jìn)行更新時(shí),Kafka Stream 就會(huì)自動(dòng)將更新消息寫到更新日志內(nèi),更新日志也自動(dòng)生成。每更新一個(gè)狀態(tài)時(shí),消息日志就被更新該日志上。比如過了一段時(shí)間,中間的 task 壞掉了,那么 Kafka Stream 會(huì)做什么呢?首先它會(huì)檢測(cè)異常,自動(dòng)地在已有的 instance 上重新啟動(dòng)原本壞掉的 task,重新構(gòu)建 State,那么 State 怎么 build 呢?通過更新 changelog,直到 restore 整個(gè)原本正在進(jìn)行的狀態(tài)的 restoration,只有新狀態(tài)
33、被 restore 完整之后才能繼續(xù) task 同步計(jì)算。消息回溯也是類似的原理。比方說,某應(yīng)用已被運(yùn)行了很多年,發(fā)現(xiàn) stream 流處理計(jì)算里面存在 Bug,我們不得不將已計(jì)算的結(jié)果舍棄,回溯到一個(gè)更早的歷史時(shí)間重新進(jìn)行計(jì)算,即計(jì)算回溯。Reprocessing 在 Kafka Stream 也是一種簡(jiǎn)單的方式,當(dāng)我們達(dá)到某一個(gè)位移,比如位移 5,需要進(jìn)行消息回溯時(shí),用戶可以簡(jiǎn)單地起一個(gè)新的狀態(tài) -New State,該 State 完全沒有任何內(nèi)容,然后從最早的時(shí)間開始重新進(jìn)行計(jì)算,直到計(jì)算到趕上現(xiàn)有 task 時(shí)候。只需要 switch over 就可以完成消息回溯,且該整個(gè)消息回溯過
34、程不需要關(guān)閉整個(gè)流處理任務(wù)。于是很多人便問,那么 Kafka Stream 能不能支持 Streaming processing 呢?舉個(gè)例子,我不希望 Kafka Stream 一直在運(yùn)行,希望它可以每 6 個(gè)小時(shí) run 一次,并且每 run 一次可將當(dāng)前所有已累計(jì)的 Kafka massage 全部處理掉。這個(gè)操作也很簡(jiǎn)單,從 outsite A 開始,一直位移到 B 結(jié)束或者到 C 結(jié)束,表示已停止整個(gè)應(yīng)用;6 個(gè)小時(shí)之后當(dāng)它重啟的時(shí)候,再?gòu)男碌奈灰崎_始進(jìn)行下一段的位移,這是批處理計(jì)算結(jié)果,即從一個(gè) outsite 到另外一個(gè) outsite,緊接著是另外一個(gè) outsiteKafka
35、 Stream 通過位移的控制和管理進(jìn)行批處理結(jié)果,而不需要運(yùn)行整個(gè) Kafka Stream。時(shí)間的管理 時(shí)間管理是流處理上非常重要的觀念,同時(shí)也是區(qū)別于流處理和批量式處理非常重要的概念。很多人都已熟悉 Event Time 和 Processing Time 的區(qū)別,Event Time 是每個(gè)日志、消息、狀態(tài)發(fā)生的時(shí)候所發(fā)生的時(shí)間,而 Processing Time 是日志被計(jì)算和處理的時(shí)候所發(fā)生的時(shí)間。這兩者可能并不是完全融合的,可能存在位移,這便是所謂的時(shí)間延遲。如上圖,以星球大戰(zhàn)故事時(shí)間和拍攝時(shí)間為例。星球大戰(zhàn)有七步曲,Processing Time 是電影真正拍攝時(shí)間,是在現(xiàn)實(shí)生
36、活中的時(shí)間1999 年到 2015 年;但是拍攝時(shí)間和星球大戰(zhàn)所發(fā)生時(shí)間并不一一對(duì)應(yīng),存在延遲。對(duì)其做流處理時(shí)候可以發(fā)現(xiàn),類似 out of order 的現(xiàn)象很常見,比如因?yàn)閿?shù)據(jù)量太大而導(dǎo)致數(shù)據(jù)發(fā)生延遲,或者說數(shù)據(jù)處理發(fā)生了延遲等,都會(huì)發(fā)生延時(shí)情況。那么 Kafka Stream 怎么解決該問題呢? Kafka Stream 允許給每個(gè)日志定義時(shí)間戳,該時(shí)間戳可以是當(dāng)前系統(tǒng)時(shí)間,也可以是提取時(shí)間戳,也可以從當(dāng)前 record 被生成的時(shí)候所提取的時(shí)間戳,這些即被定義成 Event Time。類似的,如果 record 是一個(gè) Jason format,將其時(shí)間戳提取出來(lái)也可被定義成 Event Time。有如此時(shí)間戳,我們可以基于該時(shí)間戳進(jìn)行各式計(jì)算,比方說 Windowing
溫馨提示
- 1. 本站所有資源如無(wú)特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 南師大固體廢棄物處理教案01緒論
- 車輛贈(zèng)與及維修費(fèi)用支付合同范本
- 餐飲企業(yè)員工激勵(lì)與績(jī)效管理合同
- 2025屆內(nèi)蒙古自治區(qū)赤峰市翁牛特旗第一中學(xué)八年級(jí)英語(yǔ)第二學(xué)期期末經(jīng)典試題含答案
- 木材典當(dāng)借款合同范本
- 倉(cāng)儲(chǔ)租賃合同及倉(cāng)儲(chǔ)貨物管理服務(wù)協(xié)議
- 車牌租賃與車輛租賃市場(chǎng)調(diào)查合作協(xié)議書
- 保險(xiǎn)合同理賠補(bǔ)償協(xié)議
- 文化創(chuàng)意產(chǎn)業(yè)部分股權(quán)置換與品牌合作協(xié)議
- 燒烤店品牌合作承包經(jīng)營(yíng)合同范本
- 2025年人力資源管理期末考試試卷及答案
- 2025年不動(dòng)產(chǎn)登記代理人(地籍調(diào)查)考試真題卷(帶答案)
- 銀行安全用卡培訓(xùn)課件
- 改善患者就醫(yī)體驗(yàn)服務(wù)課件
- (高清版)DB50∕T 689-2016 合成鉆石鑒定技術(shù)規(guī)范
- 2025-2030中國(guó)智慧社區(qū)行業(yè)市場(chǎng)深度調(diào)研及前景趨勢(shì)與投資研究報(bào)告
- 初中生安全用電課件
- 2025年廣東省地理初中學(xué)業(yè)水平模擬練習(xí)卷(含答案)
- 心率測(cè)定-教學(xué)設(shè)計(jì)-八年級(jí)體育健康教育
- 2025年ps cs5操作試題及答案
- 2025年太陽(yáng)能空調(diào)系統(tǒng)合同
評(píng)論
0/150
提交評(píng)論