Apache Flink十大技術難點實戰_第1頁
Apache Flink十大技術難點實戰_第2頁
Apache Flink十大技術難點實戰_第3頁
Apache Flink十大技術難點實戰_第4頁
Apache Flink十大技術難點實戰_第5頁
已閱讀5頁,還剩189頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

AllibabaGroup阿里巴巴集團Flink社區微信公眾號102萬行代碼,1270個問題,Flink新版發布了什么?從開發到生產上線,如何確定集群規劃大小?11Demo:基于FlinkSQL構建流式應用FlinkCheckpoint問題排查實用指南如何分析及處理Flink反壓?FlinkonYARN(上一張圖輕松掌握基礎架構與啟動流程FlinkonYARN(下常見問題與排ApacheFlink與ApacheHive的集成72FlinkBatchSQL1.10實踐83如何在PyFlink1.10中自定義PythonUDF?90Flink1.10NativeKubernetes原理與實踐導讀:ApacheFlink是公認的新一代開源大數據計算引擎,可以支持流處理、批處理和機器學習等多種計算形態,也是Apache軟件基金會和GitHub社區最2019年1月,阿里巴巴實時計算團隊宣布將經過雙十一歷練和集團內部業務打磨的Blink引擎進行開源并向ApacheFlink貢獻代碼,此后的一年中,阿里巴巴實時計算團隊與ApacheFlink社區密切合作,持續推進Flink對Blink的整合。式完成了Blink向Flink的合并。在此基礎之上,Flink1.10版本在生產可用性、功能、性能上都有大幅提升。本文將詳細為大家介紹該版本的重大變更與新增特性。Flink1.10是迄今為止規模最大的一次版本升級,除標志著Blink的合并完成外,還實現了Flink作業的整體性能及穩定性的顯著優化、Flink1.10.0版本一共有218名貢獻者,解決了1270個JIRAissue,經由2661個commit總共提交了超過102萬行102萬行代碼,1270個問題,Flin其中阿里巴巴實時計算團隊共提交64.5萬行代碼,超過總代碼量的60%,做出在該版本中,Flink對SQL的DDL進行了增強,并實現了生產級別的Batch支持和Hive兼容,其中TPC-DS10T的性能更是達到了Hive3.0的7倍之多。在內核方面,對內存管理進行了優化。在生態方面,增加了PythonUDF和原生Kubernetes集成的支持。后續章節將在這些方面分別進行詳細介紹。在舊版本的Flink中,流處理和批處理的內存配置是割裂的,并置使用RocksDB存儲狀態數據時,很難限制其內存使用,從而在容器環境下經常出在1.10.0中,我們對TaskExecutor的內存模型,尤其是受管理內存(Man- 此外,我們還將RocksDBstatebackend使用的內存納入了托管范疇,同時可受控前的內存使用情況(share-slot)受控后的內存使用情況(share-slot)Batch兼容Hive且生產可用Flink從1.9.0版本開始支持Hive集成,但并未完全兼容。在1.10.0中我們對Hive兼容性做了進一步的增強,使其達到生產可用的標準。具體來說,Flink1.10Meta兼容-支持直接讀取Hivecatalog,覆蓋Hive1.x/2.x/3.x全部版本數據格式兼容-支持直接讀取Hive表,同時也支持寫成Hive表的格式;支UDF兼容-支持在FlinkSQL內直接調用Hive的UDF,UDTF和UDAF與此同時,1.10.0版本中對batch執行進行了進一步的優化(FLINK-14133),向量化讀取ORC(FLINK-14135)基于比例的彈性內存分配(FLIP-53)Shuffle的壓縮(FLINK-14845) 基于新調度框架的優化(FLINK-14735)在此基礎上將Flink作為計算引擎訪問Hive的meta和數據,在TPC-DS10Tbenchmark下性能達到HivSQLDDL增強Flink1.10.0支持在SQL建表語句中定義watermark和計算列,以water-)PythonUDF支持Flink從1.9.0版本開始增加了對Python的支持(PyFlink但用戶只能使用102萬行代碼,1270個問題,FlinkJava開發的User-defined-function(UDF),具有一定的局限性。在1.10.0中我們為PyFlink增加了原生UDF支持(FLIP-58用戶現在可以在TableAPI/SQLhttps://enjoyment.cool/2020/02/19/Deep-dive-how-to-support-Python-UDF-in-Apache-Flink-1-10/原生Kubernetes集成Kubernetes(K8S)是目前最為流行的容器編排系統,也是目前最流行的容器化需要對容器、算子及kubectl等K8S命令有所了解。在Flink1.10中,我們推出了對K8S環境的原生支持(FLINK-9953Flink 的資源管理器會主動和Kubernetes通信,按需申請/projects/flink/flink-docs-stable/release-notes/flink-1.10.html2019年1月,阿里巴巴實時計算團隊宣布Blink開源。整整一年之后,Flink1.10.0版本的發布宣告Flink和Blink的整合正式完成。我們踐行著自己的諾言放源碼,更相信社區的力量,相信社區是開源協作精神與創新的搖籃。我們也衷心希望有更多的志同道合的小伙伴加入我們,一起把A從開發到生產上線,如何確定集群規劃大小?在Flink社區中,最常被問到的問題之一是:在從開發到生產上線的過程中如何確定集群的大小。這個問題的標準答案顯然是“視情況而定”,但這并非一個有用的答案。本文概述了一系列的相關問題,通過回答這些問題,或許你能得出一些數字作每秒記錄數和每條記錄的大小狀態更新的次數和狀態后端的訪問模式最后,一個更實際的問題是與客戶之間圍繞停機時間、延遲和最大吞吐量的服務網絡容量,同時把使用網絡的外部服務也納入考慮,如Kafka、HDFS等。磁盤帶寬,如果您依賴于基于磁盤的狀態后端,如RocksDB(并考慮其他磁盤使用,如Kafka或HDFS)可用的機器數量、CPU和內存基于所有這些因素,現在可以為正常運行構建一個基線,外加一個資源緩沖量用于恢復追趕或處理負載尖峰。建議您在建立基線時也考慮檢查點期間(checkpoint- 當前在假設的集群上計劃作業部署,將建立資源使用基線的過程可視化。這些數字是粗略的值,它們并不全面——在文章的最后將進一步說明在進行計算過程中遺漏Flink流計算作業和硬件示例在本案例中,我將部署一個典型的Flink流處理作業,該作業使用Flink的Kafka數據消費者從Kafka消息源中讀取數據。然后使用帶鍵的總計窗口運算符假設吞吐量為每秒100萬條消息。要了解窗口運算符(windowoperator)的狀態大小,需要知道不同鍵的數目。在本例中,鍵(keys)是用戶id的數量,即500000000個不同的用戶。對于每個用戶,需要計算四個數字,存儲為長整形(8如上圖所示,共有五臺機器在運行作業,每臺機器運行一個Flink任務管理器機到運行TaskManager的每臺計算機都由一個10千兆位以太網連接。Kafka緩存代理(brokers)在不同的機器上分開運行。每臺機器有16個CPU核。為了簡化處理,不考慮CPU和內存需求。但實際情況中,根據應用程序邏輯和正在使用的狀態后端,我們需要注意內存。這個例子使用了一個基于RocksDB的狀態后端,它穩定并且內存需求很低。要了解整個作業部署的資源需求,最容易的方法是先關注一臺計算機和一個 14>從開發到生產上線,如何確定集群規機器視角圖-TaskManagern從上圖來看,keyBy是一個單獨運算符,因此計算資源需求更容易。實際上,TheKafkasource要計算單個Kafka源(source)接收的數據量,我們首先計算K入。這些source每秒接收1000000條消息,每條消息大小為2KB。2KBx1,000,000/s=2GB/s2GB/s÷5臺機器=400MB/sKafkasource的計算過程TheShuffle/keyByShuffle過程將具有相同鍵的所有數據發送到一臺計算機,因此需要將來自400MB/s÷5臺機器=80MB/s平均而言,我們必須向每臺計算機發送80MB/s的數據。此分析是從一臺機器 16>從開發到生產上線,如何確定集群規400MB/s-80MB=320MB/sTheshuffle的計算過程Window窗口輸出和Kafka發送下一個要問的問題是窗口運算符發出多少數據并發送到Kafka接收器。答案是分鐘發出一次當前聚合總值。每個鍵從聚合中發出2個整形(user_id,window_ts)100000000個keysx40個字節=4GB(從每臺機器來看)4GB/分鐘÷60=67MB/秒(由每個任務管理器發出)這意味著每個任務管理器平均從窗口運算符發出67MB/s的用戶數據。由于每個任務管理器上都有一個Kafka發送端(和窗口運算符在同一個任務管理器中并且沒有進一步的重新分區,所以這得到的是Flink向K用戶數據:從Kafka,分發到窗口運算符并返回到Kafka窗口運算器的數據發射預計將是“突發”的,因為它們每分鐘發送一次數據。實際上,運算符不會以67mb/s的恒定速率給客戶發送數據,而是每分鐘內將可用帶 18>從開發到生產上線,如何確定集群規狀態訪問和檢查點實際情況中需要計入從磁盤訪問的開銷,包括到RocksDB的存儲狀態和檢查點。要了解磁盤訪問成本,請查看窗口運算符(windowoperator)如何訪問狀態。Kafka看。Flink正在用1分鐘的滑動窗口計算5分鐘的窗口量。Flink通過維護五個窗口來實現滑動窗口,每次滑動都對應一個1分鐘的窗口。如前所述,當使用窗口實現即時聚合時,將為每個窗口中的每個鍵(key)維護40字節的狀態。對于每個傳入事件,首先需要從磁盤檢索當前聚合值(讀取40字節更新聚合值,然后將新值寫回40字節狀態x5個窗口x每臺計算機200000msg/s=40MB/s即需要的每臺計算機的讀或寫磁盤訪問權限。如前所述,磁盤是網絡相互連接上述考慮是針對狀態訪問的,當新事件到達窗口運算符時,狀態訪問會持續進行,還需要容錯啟用檢查點。如果機器或其他部分出現故障,需要恢復窗口內容并繼檢查點設置為每分鐘一個檢查點,每個檢查點將作業的整個狀態復制到網絡連接 40字節狀態x5個窗口x100000000個keys=20GB20GB÷60=333MB/秒與窗口運算類似,檢查點是突發的,每分鐘一次,它都試圖將數據全速發送到外部存儲器。Checkpointing引發對RocksDB的額外狀態訪問(在本案例中,RocksDB位于網絡連接的磁盤上)。自Flink1.3版本以來,Rock760+760x5+400+2335=10335MB/秒從開發到生產上線,如何確定集群規劃大小?<2補充一點,這些計算都不包括協議開銷,例如來自Flink、Kafka或文件系統的TCP、Ethernet和RPC調用。但這仍然是一個很好的出發點,可以幫助您了解工基于以上分析,這個例子,在一個5節點集群的典型運行中,每臺機器都需要保留了大約40%的網絡容量因為部分被主觀對于40%的凈空是否合適,沒有一個一刀切的答案,但是這個算法應該是一個很好的起點。嘗試上面的計算,更換機器數量、鍵(keys)的數量或每秒的消息數,Demo:基于FlinkSQL構建流式應用上周四在Flink中文社區釘釘群中直播分享了《Demo:基于FlinkSQL構建流式應用》,直播內容偏向實戰演示。這篇文章是對直播內容的一個總結,并且改善了部分內容,比如除Flink外其他組件全部采用DockerCompose安裝流程。讀者也可以結合視頻和本文一起學習。完整分享可以觀看視頻回顧:https://Flink1.10.0于近期剛發布,釋放了許多令人激動的新特性。尤其是FlinkSQL模塊,發展速度非常快,因此本文特意從實踐的角度出發,帶領大家一起探索使用FlinkSQL如何快速構建流式應用。本文將基于Kafka,MySQL,Elasticsearch,Kibana,使用FlinkSQL構建一個電商用戶行為的實時分析應用。本文所有的實戰演練都將在FlinkSQLCLI上執行,全程只涉及SQL純文本,無需一行Java/Scala代碼,無需安裝IDE。本實戰一臺裝有Docker和Java8的Linux或MacOS計算機。使用DockerCompose啟動容器本實戰演示所依賴的組件全都編排到了容器中,因此可以通過docker-com-DataGen:數據生成器。容器啟動后會自動開始生成用戶行為數據,并發送到Kafka集群中。默認每秒生成1000條數據,持續生成約3小時。也可以更改 MySQL:集成了MySQL5.7,以及預先創建好了類目表(category預先Kafka:主要用作數據源。DataGen組件會自動將數據灌入這個容器中。Zookeeper:Kafka容器依賴。Elasticsearch:主要存儲FlinkSQL產出的數據。Kibana:可視化Elasticsearch中的數據。在啟動容器前,建議修改Docker的配置,將資源調整到4GB以及4核。啟動該命令會以detached模式自動啟動DockerCompose配置中定義的所有容器。你可以通過dockerps來觀察上述的五個容器是否正常啟動了。也可以訪問下載安裝Flink本地集群/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz3.通過如下命令下載依賴jar包,并拷貝到lib/目錄下,也可手動下載和拷 6.執行bin/sql-client.shembedded啟動SQLCLI。便會看到如下的 使用DDL創建Kafka表Datagen容器在啟動后會往Kafka的user_behaviortopic中持續不斷地寫入數據。數據包含了2017年11月27日一天的用戶加購、喜歡每一行表示一條用戶行為,以JSON的格式由用戶ID、商品ID、商品類目ID、行為類型和時間組成。該原始數據集來自阿里云天池公開數據集,特此我們可以在docker-compose.yml所在目錄下運行如下命令,查看Kafka集FlinkSQLCLI中執行該DDL。如上我們按照數據的格式聲明了5個字段,除此之外,我們還通過計算列語法和PROCTIME()內置函數聲明了一個產生處理時間的虛擬列。我們還通過WATERMARK語法,在ts字段上聲明了watermark策略(容忍5秒亂序ts字段因此也成了事件時間列。關于時間屬性以及DDL語法可以閱讀官方文檔了解 時間屬性:/projects/flink/flink-docs-release-1.10/DDL:/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-在SQLCLI中成功創建Kafka表后,可以通過showtables;和describeuser_behavior;來查看目前已注冊的表,以及表的詳細信息。我們也可以直接在SQLCLI中運行SELECT*FROMuser_behavior;預覽下數據(按q退出)。接下來,我們會通過三個實戰場景來更深入地了解FlinkSQL。使用DDL創建Elasticsearch表我們先在SQLCLI中創建一個ES結果表,根據場景需求主要需要保存兩個數k提交Query統計每小時的成交量就是每小時共有多少“buy”的用戶行為。因此會需要用到TUMBLE窗口函數,按照一小時切窗。然后每個窗口分別統計“buy”的個數,這可這里我們使用HOUR內置函數,從一個TIMESTAMP列中提取出一天中第幾個小時的值。使用了INSERTINTO將query的結果持續不斷地插入到上文定義的es結果表中(可以將es結果表理解成query的物化視圖)。另外可以閱讀該文檔了解更多關于窗口聚合的內容:/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows在FlinkSQLCLI中運行上述查詢后,在FlinkWebUI中就能看到提交的任 使用Kibana可視化結果localhost:5601訪問Kibana。首先我們需要先配置一個indexpattern。點擊左接下來,我們先創建一個Dashboard用來展示各個可視化的視圖。點擊頁面左擊“CreateNew”創建一個新的視圖,選擇“Area”面積圖,選擇“buy_cnt_per_hour”索引,按照如下截圖中的配置(左側)畫出成交量面積圖,并保另一個有意思的可視化是統計一天中每一刻的累計獨立用戶數(uv也就是每一刻的uv數都代表從0點到當前時刻為止的總計uv數,因此該曲線肯定是單調遞我們仍然先在SQLCLI中創建一個Elasticsearch表,用于存儲結果匯總數 為了實現該曲線,我們可以先通過OVERWINDOW計算出每條數據的當前通過內置的COUNT(DISTINCTuser_id)來完成,FlinkSQL內部對COUNTDISTINCT做了非常多的優化,因此可以放心使用。這里我們使用SUBSTR和DATE_FORMAT還有||內置函數,將一個TIME-STAMP字段轉換成了10分鐘單位的時間字符串,如:12:10,12:20。關于ERWINDOW的更多內容可以參考文檔:/projects//flink-docs-release-1.10/dev/table/sql/queries.html#aggregations我們還使用了CREATEVIEW語法將query注冊成了一個邏輯視圖,可以方便地在后續查詢中對該query進行引用,這有利于拆解復雜query。注意,創建邏輯視圖不會觸發作業的執行,視圖的結果也不會落地,因此使用起來非常輕量,沒有額外開銷。由于uv_per_10min每條輸入數據都產生一條輸出數據,因此對于存儲分鐘只有一個點會存儲在Elasticsearch中,對于Elasticsearch和Kibana可視化最后一個有意思的可視化是類目排行榜,從而了解哪些類目是支柱類目。不過由能將其歸約到頂級類目。所以筆者在mysql容器中預先準備了子類目與頂級類目的在SQLCLI中創建MySQL表,后續用作維表查詢。 同時我們再創建一個Elasticsearch表,用于存儲類目統計結果。第一步我們通過維表關聯,補全類目名稱。我們仍然使用CREATEVIEW將該查詢注冊成一個視圖,簡化邏輯。維表關聯使用temp/table/streaming/joins.html#join-with-a-temporal-table最后根據類目名稱分組,統計出buy的事件數,并寫入Elasticsearch中。提交上述查詢后,在Kibana中創建to到目前為止,我們已經完成了三個實戰案例及其可視化視圖。現在可以回到Dashboard頁面,對各個視圖進行拖拽編排,讓我們的Dashboard看上去更加正項,而用戶行為數據中也有很多有意思的信息值得挖掘,感興趣的讀者可以用FlinkSQL對數據進行更多維度的分析,并使用Kibana展示更多可視化圖,并觀測圖形在本文中,我們展示了如何使用FlinkSQL集成Kafka,MySQL,Elastic- search以及Kibana來快速搭建一個實時分析應用。整個過程無需一行Java/Scala代碼,使用SQL純文本即可完成。期望通過本文,可以讓讀者了解到FlinkSQL的易用和強大,包括輕松連接各種外部系統、對事件時間和亂序數據處理的原生支持、維表關聯、豐富的內置函數等等。希望你能喜歡我們的實戰演練,并從中獲得樂趣和Flink會從最近成功的Checkpoint恢復。在實際情況中,我們可能會遇到Check-1.Checkpoint流程簡介 Source收到triggercheckpoint的PRC,自己開始做snapshot,并往下游Task開始同步階段snapshotTask開始異步階段snapshotTasksnapshot完成,匯報給JM2.Checkpoint異常情況排查2.1Checkpoint失敗其中Acknowledged一列表示有多少個subtask對這個Checkpoint進行了ack,從圖中我們可以知道第三個operator總共有5個subtask,但是只有4個進行了ack;第二列LatestAcknowledgement表示該operator的StateSize表示當前Checkpoint的state大小--主要這里如果是增量 Checkpoint失敗大致分為兩種情況:CheckpointDecline和Checkpoint0b60f08bf8984085b59f8d9bc74ce2e1是executionid,85d268e6fbc-當前Flink中如果較小的Checkpoint還沒有對齊的情況下,收到了更大的這個日志表示,當前Checkpoint19還在對齊階段,我們收到了Checkpoint在下游task收到被cancelBarrier的時候,會打印類似如下的日志:上面三種日志都表示當前task接收到上游發送過來的barrierCancel消息,從如果Checkpoint做的非常慢,超過了timeout還沒有完成,則整個Check-point也會失敗。當一個Checkpoint由于超時而失敗是表示Chekpoint1由于超時而失敗,這個時候可以可以看這個日志后面是否有 可以按照2.1.1中的方法找到對應的taskmanager.log查看具體信息。下面的日志如果是DEBUG的話,我們會在開始處標記DEBUG我們按照下面的日志把TM端的snapshot分為三個階段,開始做snapshot上面的日志表示當前這個backend的同步階段完成,共使用了上面的日志表示異步階段完成,異步階段使用了369ms2.2Checkpoint慢在2.1節中,我們介紹了Checkpoint失敗的排查思路,本節會分情況介紹Checkpoint經常需要做9分鐘(我們希望1分鐘左右就能夠做完而且我們預期statesize不是非常大。這個一般發生較少,但是也有可能,因為source做snapshot并往下游發送point,其中全量Checkpoint會把當前的state全部備份一次到持久化存儲,而 現在Flink中僅在RocksDBStateBackend中支持增量Checkpoint,如果你已經使用RocksDBStateBackend,可以通過開啟增量Checkpoint來加速,具體我們知道task僅在接受到所有的barrier之后才會進行snapshot,如果作業存在反壓,或者有數據傾斜,則會導致全部的channel或者某些channel的barrier上圖中我們選擇了一個task,查看所有subtask的反壓情上圖中我們選擇其中一個operator,點擊所有的subtask,然后按照RecordsReceived/BytesReceived/TPS從大到小進行排序,能看到前面幾個subtask會如果存在反壓或者數據傾斜的情況,我們需要首先解決反壓或者數據傾斜問題之從前面我們知道Checkpoint在task端分為barrier對齊(收齊所有上游發送如果taskmanager.log中沒有這個日志,則表示barrier一直沒有對齊,接下來我們需要了解哪些上游的barrier沒有發送下來,如果你使用AtLeastOnce的表示該task收到了channel5來的barrier,然后看對應Checkpoint,再查看還剩哪些上游的barrier沒有接受到,對于ExactlyOnce暫時沒有類似的日志,可在task端,所有的處理都是單線程的,數據處理和barrier處理都由主線程處理,如果主線程在處理太慢(比如使用RocksDBBackend,state操作慢導致整體 2.使用工具AsyncProfiledump一份火焰圖,查看占用CPU最多的棧;同步階段一般不會太慢,但是如果我們通過日志發現同步階段比較慢的話,對于非RocksDBBackend我們可以考慮查看是否開啟了異步snapshot,如果開啟了異步snapshot還是慢,需要看整個JVM在干嘛,也可以使用前一節中的工具。對于RocksDB開始snapshot的日志如下:Backend來說,主要瓶頸來自于網絡,這個階段可以考慮觀察網絡的metric,或者對于RocksDB來說,則需要從本地讀取文件,寫入到遠程的持久化存儲上,所以不僅需要考慮網絡的瓶頸,還需要考慮本地磁盤的性能。另外對于RocksDB-Backend來說,如果覺得網絡流量不是瓶頸,但是上傳比較慢的話,還可以嘗試考3.總結在第二部分內容中,我們介紹了官方編譯的包的常情況的主要場景,以及相應的排查方法,如果排查了上面所有的情況,還是沒有發上文提到的一些DEBUG日志,如果flinkdist包是自己編譯的話,則建議將Checkpoint整個步驟內的一些DEBUG改為INFO,能夠通過日志了解整個[1]Changethreading-modelinStreamTasktoamailbox-basedapproach[3]RocksDBStateBackend多線程上傳State問題。反壓意味著數據管道中某個節點成為瓶頸,處理速率跟不上上游發送數據的速率,而需要對上游進行限速。由于實時計算應用通常使用消息隊列來進行生產端和消費端的解耦,消費端數據源是pull-based的,所以反壓通常是從某個節點傳導至數關于Flink的反壓機制,網上已經有不少博客介紹,中文博客推薦這兩篇1。簡單來說,Flink拓撲中每個節點(Task)間的數據都以阻塞隊列的方式傳輸,下游來不及消費導致隊列被占滿后,上游的生產也會被阻塞,最終導致數據源的攝入被阻塞。而本文將著重結合官方的博客[4]分享筆者在實踐中分析和處理Flink反壓反壓并不會直接影響作業的可用性,它表明作業處于亞健康的狀態,有潛在的性能瓶頸并可能導致更大的數據處理延遲。通常來說,對于一些對延遲要求不太高或者數據量比較小的應用來說,反壓的影響可能并不明顯,然而對于規模比較大的Flink前者是因為checkpointbarrier是不會越過普通數據的,數據處理后者是因為為保證EOS(Exactly-Once-Semantics,準確一次對于有接受到較快的輸入管道的barrier后,它后面數據會被緩存起來但不處理,直這兩個影響對于生產環境的作業來說是十分危險的,因為checkpoint是大小同樣可能拖慢checkpoint甚至導致OOM(使用Heap-basedStateBackend)或者物理內存使用超出容器資源(使用RocksDBStateBackend)的穩定性問題。因此,我們在生產中要盡量避免出現反壓的情況(順帶一提,為了緩解反壓給checkpoint造成的壓力,社區提出了FLIP-76:UnalignedCheckpoints[4]來解耦要解決反壓首先要做的是定位到造成反壓的節點,這主要有兩種辦法:2.通過FlinkTaskMetrics。前者比較容易上手,適合簡單分析,后者則提供了更加豐富的信息,適合用于監控系統。因為反壓會向上游傳導,這兩種方式都要求我們從Source節點到Sink的FlinkWebUI的反壓監控提供了SubTask級別的反壓監控,原理是通過周期性對Task線程的棧信息采樣,得到線程被阻塞在請求Buff塞)的頻率來判斷該節點是否處于反壓狀態。默認配置 OK,0.1至0.5為LOW,而超過0.5則為HIGH。1.該節點的發送速率跟不上它的產生數據速率。這一般會發生在一條輸入多條2.下游的節點接受速率較慢,通過反壓機制限制了該節點的發如果是第一種狀況,那么該節點則為反壓的根源節點,它是從SourceTask到值得注意的是,反壓的根源節點并不一定會在反壓面板體現出高反壓,因為反壓面板監控的是發送端,如果某個節點是性能瓶頸并不會導致它本身出現高反壓,而是導致它的上游出現高反壓。總體來看,如果我們找到第一個出現反壓的節點,那么反那么如果區分這兩種狀態呢?很遺憾只通過反壓面板是無法直接判斷的,我們還很大,由于要采集所有Task的棧信息,反壓面板的壓力也會很大甚至不可用。TaskMetricsFlink提供的TaskMetrics是更好的反壓監控手段,但也要求更加豐富的背景TaskManager傳輸數據時,不同的TaskManager上的兩個Subtask間通常根據key的數量有多個Channel,這些Channel會復用同一個TaskManager級別的TCP鏈接,并且共享接收端Subtask級別的BufferPool。在接收端,每個Channel在初始階段會被分配固定數量的ExclusiveBuffer,這些Buffer會被用于存儲接受到的數據,交給Operator使用后再次被釋放。Channel接收端空閑的Buffer數量稱為Credit,Credit會被定時同步給發送端被后在流量較大時,Channel的ExclusiveBuffer可能會被寫滿,此時Flink會向個Channel需要就去哪里。而在Channel發送端,一個Subtask所有的Channel會共享同一個BufferPool,這邊就沒有區分ExclusiveBuffer和FloatingBuffer。 圖2FlinkCredit-Based網絡我們在監控反壓時會用到的Metrics主要和Channel接受端的Buffer使用率有關,最為有用的是以下幾個Metrics:MetrisoutPoolUsage發送端Buffer的使用率inPoolUsage接收端Buffer的使用率exclusiveBuffersUsage(1.9以上)接收端ExclusiveBuffer的使用率其中inPoolUsage等于floatingBuffersUsage與exclusiveBuffersUsage的反壓傳導至上游。反壓情況可以根據以下表格進行對號入outPoolUsage和inPoolUsage同為低或同為高分別表明當前Subtask正常或處于被下游反壓,這應該沒有太多疑問。而比較有趣的是當outPoolUsage和inPoolUsage表現不同時,這可能是出于反壓傳導的中間狀態或者表明該Subtask如果一個Subtask的outPoolUsage是高,通常是被下游Task所影響,所以可以排查它本身是反壓根源的可能性。如果一個Subtask的outPoolUsage是低,但其inPoolUsage是高,則表明它有可能是反壓的根源。因為通常反壓會傳導至其上游,導致上游某些Subtask的outPoolUsage為高,我們可以根據這點來進一步判斷。值得注意的是,反壓有時是短暫的且影響不大,比如來自某個Channel的短暫網絡延遲或者TaskManager的正常GC,這種情況下我們可以不用處理。對于Flink1.9及以上版本,除了上述的表格,我們還可以根據floatingBsUsage/exclusiveBuffersUsage以及其上游Task的outPoolUsage來進行進一步的分析一個Subtask和其上游Subtask的數據傳輸。 BuffersUsage則表明了反壓是否存在傾斜(floatingBuffersUsage高、exclusive-至此,我們已經有比較豐富的手段定位反壓的根源是出現在哪個節點,但是具體定位到反壓節點后,分析造成原因的辦法和我們分析一個普通程序的性能瓶頸的辦法是十分類似的,可能還要更簡單一點,因為我們要觀察的主要是TaskThread。在實踐中,很多情況下的反壓是由于數據傾斜造成的,這點我們可以通過WebUI各個SubTask的RecordsSent和RecordReceived來確認,另外Check-pointdetail里不同SubTask的Statesize也是一個分析數據傾斜的有用指標。此外,最常見的問題可能是用戶代碼的執行效率問題(頻繁被阻塞或者性能問題)。最有用的辦法就是對TaskManager進行CPUprofile,從中我們可以分析到TaskThread是否跑滿一個CPU核:如果是的話要分析CPU主要花費在哪些函數里面,比如我們生產環境中就偶爾遇到卡在Regex的用戶函數(ReDoS如果不是的話要看TaskThread阻塞在哪里,可能是用戶函數本身有些同步的調用,可能是checkpoint或者GC等系統活動導致的暫時系統暫停。當然,性能分析的結果也可能是正常的,只是作業申請的資源不足而導致了反壓,這就通常要求拓展并行度。值得一提的,在未來的版本Flink將會直接在WebUI提供JVM的CPU火焰圖[5],這將大大簡化性能瓶頸的分析。另外TaskManager的內存以及GC問題也可能會導致反壓,包括Task-ManagerJVM各區內存不合理導致的頻繁FullGC甚至失聯。推薦可以通過給TaskManager啟用G1垃圾回收器來優化GC,并加上-XX:+PrintGCDetails來打印GC日志的方式來觀察GC的問題。反壓是Flink應用運維中常見的問題,它不僅意味著性能瓶頸還可能導致作業的不穩定性。定位反壓可以從WebUI的反壓監控面板和TaskMetric兩者入手,前者方便簡單分析,后者適合深入挖掘。定位到反壓節點后我們可以通過數據分布、CPUProfile和GC指標日志等手段來進一步分析反壓背后的具體原因并進行針對性2.一文徹底搞懂Flink網絡流控與反壓機制3.Flink輕量級異步快照ABS實現原理4.FlinkNetworkStackVol.2:Monitoring,Metrics,andthatBackpressureThing5.SupportforCPUFlameGraphsinnewwebUIFlinkonYARN(上Flink支持Standalone獨立部署和YARN、Kubernetes、Mesos等集群部署模式,其中YARN集群部署模式在國內的應用越來越廣泛。Flink社區將推出FlinkonYARN應用解讀系列文章,分為上、下兩篇。本文基于FLIP-6重構后的資源調度模型將介紹FlinkonYARN應用啟動全流程,并進行詳細步驟解析。下篇將根據社區大群反饋,解答客戶端和FlinkCluster的常見問題,分享相關問題的排查FlinkonYARN流程圖FlinkonYARN集群部署模式涉及YARN和Flink兩大開源框架,應用啟動流程的很多環節交織在一起,為了便于大家理解,在一張圖上畫出了FlinkonYARN基礎架構和應用啟動全流程,并對關鍵角色和流程進行了介紹說明,整個啟動流程又標注為橙色)兩個階段分別闡述,由于分支和細節太多,本文會忽略掉一些,只介紹YARN(上一張圖輕松掌握基礎架構與啟動流1.執行命令:bin/flinkrun-d-myarn-cluster...或bin/yarn-session.sh...來提交per-job運行模式或session運行模式的應用;2.解析命令參數項并初始化,啟動指定運行模式,如果是per-job運行模式將如果可以從命令行參數(-yid)或YARNproperties臨時文件(${java.io.tmp-dir}/.yarn-properties-${})中獲否則當命令行參數中包含-d(表示detached模式)和-myarn-clus否則當命令行參數項不包含-yq(表示查詢YARN集群可用資源)時,啟動session運行模式;3.獲取YARN集群信息、新應用ID并啟動運行前檢查;通過YarnClient向YARNResourceManager(下文縮寫為:YARNRM,YARNMaster節點,負責整個集群資源的管理和調度)請求創建一個新應用(YARNRM收到創建應用請求后生成新應用ID和container申請的 資源上限后返回并且獲取YARNSlave節點報告(YARNRM返回全部slave節點的ID、狀態、rack、httpflinkJobManager/TaskManagervcores資源申請需求;(3)指定queue是否存在(不存在也只是打印WARN信息,后續向YARN提交時排除異常并退出);(4)當預期應用申請的Container資源會超出YARN資源限制時拋出異常并退出;(5)當預期應用申請不能被滿足時(例如總資源超出YARN集群可4.將應用配置(flink-conf.yaml、logback.xml、perties)和相關文件(flinkjars、shipfiles、userjars、jobgraph等)上傳至分布如HDFS)的應用暫存目錄(/user/${}/.flink/);5.準備應用提交上下文(ApplicationSubmissionContext,包括應用的名classpath、資源大小等),注冊處理部署失敗的shutdownhook(清理應用對應的HDFS目錄然后通過YarnClient向YARNRM提交應用;6.循環等待直到應用狀態為RUNNING,包含兩個階段:循環等待應用提交成功(SUBMITTED默認每隔200ms通過YarnClient獲取應用報告,如果應用狀態不是NEW和NEW_SAVING則認為提交成功并退出循環,每循環10次會將當前的應用狀態輸出至日志:"Application循環等待應用正常運行(RUNNING每隔250ms通過YarnClient獲取應hasbeendeployedsuccessfully."并退出循環,如果等到的是非預期狀態如FAILED/FINISHED/KILLED,就會在輸出YARN返回的診斷信息("TheYARNapplicationunexpectedlyswitchedtostateduringdeployment.DiagnosticsfromYARN:...")之后拋出異常并退出。FlinkCluster啟動流程1.YARNRM中的ClientRMService(為普通用戶提供的RPC服務組件,處理來自客戶端的各種RPC請求,比如查詢YARN集群信息,提交、終止應用等)接收到應用提交請求,簡單校驗后將請求轉交給RMAppMan-ager(YARNRM內部管理應用生命周期的組件2.RMAppManager根據應用提交上下文內容創建初始狀態為NEW的應用,將應用狀態持久化到RM狀態存儲服務(例如ZooKeeper集群,RM狀態存儲服務用來保證RM重啟、HA切換或發生故障后集群應用能夠正常恢復,后續流程中的涉及狀態存儲時不再贅述應用狀態變為NEW_SAVING;3.應用狀態存儲完成后,應用狀態變為SUBMITTED;RMAppManager開始向ResourceScheduler(YARNRM可拔插資源調度器,YARN自帶三種調度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最廣泛,FifoScheduler功能最簡單基本不可用,今年社區已明確不再繼續支持FairScheduler,建議已有用戶不是葉子隊列、隊列已停用、超出隊列最大應用數限制等)則拋出拒絕該應用,應用狀態先變為FINAL_SAVING觸發應用狀態存儲流程并在完成后變為FAILED;如果提交成功,應用狀態變為ACCEPTED;4.開始創建應用運行實例(ApplicationAttempt,由于一次運行實例中最重要的組件是ApplicationMaster,下文簡稱AM,它的狀態代表了Applica-5.初始化應用運行實例信息,并向ApplicationMasterService(AM&RM協 議接口服務,處理來自AM的請求,主要包括注冊和心跳)注冊,應用實例狀態變為SUBMITTED;6.RMAppManager維護的應用實例開始初始化AM資源申請信息并重新校驗隊列,然后向ResourceScheduler申請AMContainer(Container是YARN中資源的抽象,包含了內存、CPU等多維度資源應用實例狀態變為ACCEPTED;7.ResourceScheduler會根據優先級(隊列/應用/請求每個維度都有優先級配置)從根隊列開始層層遞進,先后選擇當前優先級最高的子隊列、應用直至具體某個請求,然后結合集群資源分布等情況作出分配決策,AMContainer分配成功后,應用實例狀態變為ALLOCATED_SAVING,并觸發應用實例狀態存儲流程,存儲成功后應用實例狀態變為ALLOCATED;8.RMAppManager維護的應用實例開始通知ApplicationMasterLauncher(AM生命周期管理服務,負責啟動或清理AMcontainer)啟動AMcon-tainer,ApplicationMasterLauncher與YARN稱YARNNM,與YARNRM保持通信,負責管理單個節點上的全部資源、Container生命周期、附屬服務等,監控節點健康狀況和Container資源使9.ContainerManager(YARNNM核心組件,管理所有Container的生命周期)接收到AMcontainer啟動請求,YARNNM開始校驗ContainerToken及資源文件,創建應用實例和Container實例并存儲至本地,結果返回后應用實例狀態變為LAUNCHED;10.ResourceLocalizationService(資源本地化服務,負責Container所需資源的本地化。它能夠按照描述從HDFS上下載Container所需的文件資源,并盡量將它們分攤到各個磁盤上以防止出現訪問熱點)初始化各種服務組件、創建工作目錄、從HDFS下載運行所需的各種資源至Container工作目錄(路徑為:${yarn.nodemanager.local-dirs}/usercache/${user}/11.ContainersLauncher(負責container的具體操作,包括啟動、重啟、恢復和清理等)將待運行Container所需的環境變量和運行命令寫到Container工作目錄下的launch_container.sh腳本中,然后運行該腳本12.Container進程加載并運行ClusterEntrypoint(FlinkJobManager入口類,每種集群部署模式和應用運行模式都有相應的實現,例如在YARN集群部署模式下,per-job應用運行模式實現類是YarnJobClusterEntrypoint,session應用運行模式實現類是YarnSessionClusterEntrypoint),首先初輸出各軟件版本及運行環境信息、命令行參數項、classpath等信息;注冊處理各種SIGNAL的handler:記錄到日志注冊JVM關閉保障的shutdownhook:避免JVM退出時被其他shutdown打印YARN運行環境信息:用戶名初始化文件系統創建并啟動各類內部服務(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)將RPCaddress和port更新到flinkconf配置13.啟動ResourceManager(Flink資源管理核心組件,包含YarnRe-sourceManager和SlotManager兩個子組件,YarnResourceManagerTaskManager資源,注銷應用等;SlotManager則負責內部資源管理,維護全部Slot信息和狀態)及相關服務,創建異步AMRMClient,開始注冊AM,注冊成功后每隔一段時間(心跳間隔配置項:${er-val},默認5s)向YARNRM發送心跳來發送資源更新請求和接受資源變更結果。YARNRM內部該應用和應用運行實例的狀態都變為RUNNING, 并通知AMLivelinessMonitor服務監控AM是否存活狀態,當心跳超過一14.啟動Dispatcher(負責接收用戶提供的作業,并且負責為這個新提交的作業拉起一個新的JobManager)及相關服務(包括RESTendpoint等JobGraph文件;在session運行模式下,Dispatcher將在接收客戶端提15.根據JobGraph啟動JobManager(負責作業調度、管理Job和Task的生命周期構建ExecutionGraph(JobGraph的并行化版本,調度層最16.JobManager開始執行ExecutionGraph,向ResourceManager申請17.ResourceManager將資源請求加入等待請求隊列,并通過心跳向YARNRM申請新的Container資源來啟動TaskManager進程;后續流程如果有空閑Slot資源,SlotManager將其分配給等待請求隊列中匹配的請求,不用再通過18.YarnResourceManager申請新的TaskManager;18.YARNApplicationMasterService接收到資源請求后,解析出新的資源19.YARNResourceScheduler成功為該應用分配資源后更新應用信息,ApplicationMasterService接收到FlinkJobManager的下一次心跳時返20.FlinkResourceManager接收到新分配的Container資源后,準備好TaskManager啟動上下文(ContainerLauncherContext,生成Task-Manager配置并上傳至分布式存儲,配置其他依賴和環境變量等然后向YARNNM申請啟動TaskManager進程,YARNNM啟動Container的21.TaskManager進程加載并運行YarnTaskExecutorRunner(FlinkTaskManager入口類初始化流程完成后啟動TaskExecutor(負責執行Task相關操作22.TaskExecutor啟動后先向ResourceManager注冊,成功后再向SlotManager匯報自己的Slot資源與狀態;SlotManager接收到Slot空閑資源后主動觸發Slot分配,從等待請TaskManager請求該Slot資源23.TaskManager收到請求后檢查該Slot是否可分配(不存在則返回異常信24.JobManager檢查Slot分配是否重復,通過后通知Execution執行部署task流程,向TaskExecutor提交task;TaskExecutor啟動新的線程運行Task。FlinkRelease-1.9SourceCodeFlinkRelease-1.9Documents[FLIP-6-FlinkDeploymentandProcessModel-Standalone,Yarn,Mesos,Kubernetes,etc.]YARN3.2SourceCodeYARN3.2.0DocumentsFlinkonYARN(下常見問題與排查思路Flink支持Standalone獨立部署和YARN、Kubernetes、Mesos等集群部署模式,其中YARN集群部署模式在國內的應用越來越廣泛。Flink社區將推出FlinkonYARN應用解讀系列文章,分為上、下兩篇。上篇分享了基于FLIP-6重構后的資源調度模型介紹FlinkonYARN應用啟動全流程,本文將根據社區大群反饋,解答客戶端和FlinkCluster的常見問題,分享相關問題的排查思路。這個問題的迷惑性較大,很多時候并非指定運行的JAR文件問題,而是提交過程中發生了異常,需要根據日志信息進一步排查。最常見原因是未將依賴的Hadoop JAR文件加到CLASSPATH,找不到依賴類(例如:ClassNotFoundException:org.apache.hadoop.yarn.exceptions.YarnException)導致加載客戶端入口類(FlinkYarnSessionCli)失敗。FlinkonYARN客戶端通常需配置HADOOP_CONF_DIR和HADOOP_CLASSPATH兩個環境變量來讓客戶端能加載到Hadoop配置和依賴JAR文件。示例(已有環境變量HADOOP_HOME指定Hadoop部署目錄flink-${USER}-client-.log,使用log4j配置:${FLINK_HOME}/conf/log4j-cli.有的客戶端環境比較復雜,難以定位日志位置和配置時,可以通過以下環境變量配置打開log4j的DEBUG日志,跟蹤log4j的初始化和JVM_ARGS=”-Dlog4j.debug=true”為DEBUG后重新運行,看是否有DEBUG日志可以幫助排查問題。對于一些沒有日志或日志信息不完整的問題,可能需要開展代碼級調試,修改源碼重新打包替換的方式太過繁瑣,推薦使用Java字節碼注入工具By(1)編寫調試腳本,例如打印Flink實際使用的Client類,以下腳本表示在CliFrontend#getActiveCust(2)設置環境變量,使用bytemanjavaagent: (3)運行測試命令bin/flinkrun-myarn-cluster-p1./examples/stream-ing/WordCount.jar,控制臺將輸出內容:FlinkCluster常見問題與排查思路用戶應用和框架JAR包版本沖突問題該問題通常會拋出NoSuchMethodError/ClassNotFoundException/Incom-patibleClassChangeError等異常,要解決此類問題:1.首先需要根據異常類定位依賴庫,然后可以在項目中執行mvndepen-dency:tree以樹形結構展示全部依賴鏈,再從中定位沖突的依賴庫,也可以增加參數-Dincludes指定要顯示的包,格式為[groupId]:[artifactId]:[--Dincludes=power,javaassist;2.定位沖突包后就要考慮如何排包,簡單的方案是用exclusion來排除掉其從他依賴項目中傳遞過來的依賴,不過有的應用場景需要多版本共存,不同組件依賴不同版本,就要考慮用MavenShade插件來解決,詳情請參考MavenShadePlugin。依賴庫有多版本JAR包共存時如何確定某類的具體來源?很多應用運行CLASSPATH中存在相同依賴庫的多個版本JAR包,導致實際使用的版本跟加載順序有關,排查問題時經常需要確定某個類的來源JAR,Flink支持給JM/TM進程配置JVM參數,因此可以通過下面三個配置項來打印加載類及其Flink應用運行中的JM/TM日志可以在WebUI上查看,但是查問題時通常需要結合完整日志來分析排查,因此就需要了解YARN的日志保存機制,YARN上1.如果應用還沒有結束,Container日志會一直保留在其運行所在的節點上,即使Container已經運行完成仍然可以在所在節點的配置目錄下找到:2.如果應用已結束并且集群啟用了日志收集(yarn.log-aggregation-en-able=true),則通常應用結束后(也有配置可以增量上傳)NM會將其全部日志上傳至分布式存儲(通常是HDFS)并刪除本地文件,我們可以通志,還可以增加參數項-containerId-nodeAddress來查看某containerremote-app-log-dir}/${user}/${yarn.nodemanager.remote-app-log-dir-suffix}/Flink應用資源分配問題排查思路如果Flink應用不能正常啟動達到RUNNING狀處于NEW__SAVING狀態時正在進行應用信息持久化,如果持續處于這個如果處于SUBMITTED狀態,可能是RM內部發生一些hold讀寫鎖的耗時操作導致事件堆積,需要根據YARN集群日志 如果處于ACCEPTED狀態,需要先檢查AM是否正常,跳轉到步驟2;如果已經是RUNNING狀態,但是資源沒有全部拿到導致JOB無法正常Queue’sAMresourcelimitexceeded.原因是達到了隊列AM可用資源上限,即隊列的AM已使用資源和AM新申請資源之和超出了隊列的AM資源上限,可以適當調整隊列AM可用資源百分比的配置項:yarn.scheduler.capacity..maximum-am-resource-percent。User’sAMresourcelimitexceeded.原因是達到了應用所屬用戶在該隊新申請資源之和超出了應用所屬用戶在該隊列的AM資源上限,可以適當提高用戶可用AM資源比例來解決該問題,相關配置項:yarn.scheduler.capacity..user-limit-factor與yarn.scheduler.capacity..minimum-us-er-limit-percent。AMcontainerislaunched,wRM.大致原因是AM已啟動,但內部初始化未完成,可能有ApplicationisActivated,waitingforresourcestobeassignedforAM.3.確認應用確實有YARN未能滿足的資源請求:從應用列表頁點擊問題應用ID進入應用頁面,再點擊下方列表的應用實例ID進入應用實例頁面,看TotalOutstandingResourceRequests列表中是否有Pending資源,如果沒有,說明YARN已分配完畢,退出該檢查流程,轉去檢查AM;如果4.調度器分配問題排查,YARN-9050支持在WebUI上或通過RESTAPI自檢查集群或queue資源,scheduler頁面樹狀圖葉子隊列展開查看資源信息:EffectiveMaxResource、UsedResources1)檢查集群資源或所在隊列資源或其父隊列資源是否已用完2)檢查葉子隊列某維度資源是否檢查是否存在資源碎片1)檢查集群Used資源和Reserved資源之和占總資源的比例,當集群資源接近用滿時(例如90%以上可能存片的情況,應用的分配速度就會受影響變慢,因為大部分機器都沒有資源了,機器可用資源不足會被reserve,reserved資源達到一定規模后可能導致大部分機器資源被鎖定,后續分配可能就會變慢2)檢查NM

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論