




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
6.狀態和檢查點本章將重點圍繞狀態、檢查點(Checkpoint)和保存點(Savepoint)三個概念來介紹如何在Flink上進行有狀態的計算。在Flink架構體系中,有狀態計算可以說是Flink非常重要的特性之一。有狀態計算是指在程序計算過程中,在Flink程序內部存儲計算產生的中間結果,并提供給后續Function或算子計算結果使用。檢查點是Flink保證exactly-once的重要特性。通過本節學習您將可以:掌握Flink中幾種常用的狀態以及具體使用方法。掌握Checkpoint機制的原理和配置方法。了解Savepoint機制的原理和使用方法。實現有狀態的計算Checkpoint原理及配置方法Savepoint原理及使用方法
什么是有狀態的計算有狀態計算的潛在場景數據去重:需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入數據去重檢查輸入流是否符合某個特定模式:之前流入的數據以狀態的形式緩存下來對一個窗口內的數據進行聚合分析,比如分析一小時內某項指標75分位值或99分位值Flink分布式計算,一個算子有多個算子子任務狀態可以被理解為某個算子子任務在當前實例上的一個變量,變量記錄了數據流的歷史信息,新數據流入,可以結合歷史信息來進行計算接收輸入流/獲取對應狀態/更新狀態狀態管理的難點要解決問題:實時性,延遲不能太高數據不丟不重、恰好計算一次,尤其發生故障恢復后程序的可靠性要高,保證7*24小時穩定運行難點不能將狀態直接交由內存,因為內存空間有限用持久化的系統備份狀態,出現故障時,如何從備份中恢復需要考慮擴展到多個節點時的伸縮性Flink解決了上述問題,提供有狀態的計算APIManaged
State和Raw
State托管狀態(ManagedState)是由Flink管理的,Flink幫忙存儲、恢復和優化原生狀態(RawState)是開發者自己管理的,需要自己序列化Managed
State又細分為Keyed
State和Operator
StateFlink的幾種狀態類型
ManagedStateRawState狀態管理方式FlinkRuntime托管,自動存儲、自動恢復、自動伸縮用戶自己管理狀態數據結構Flink提供的常用數據結構,如ListState、MapState等字節數組:byte[]使用場景絕大多數Flink函數用戶自定義函數Keyed
State是KeyedStream上的狀態,每個Key共享一個狀態OperatorState每個算子子任務共享一個狀態Keyed
State和Operator
StateKeyed
State相同Key的數據可以訪問、更新這個狀態Operator
State流入這個算子子任務的所有數據可以訪問、更新這個狀態Keyed
State和Operator
State都是基于本地的,每個算子子任務維護著自身的狀態,不能訪問其他算子子任務的狀態具體的實現層面,Keyed
State需要重寫Rich
Function函數類,Operator
State需要實現CheckpointedFunction等接口Keyed
State和Operator
State
KeyedStateOperatorState適用算子類型只適用于KeyedStream上的算子可以用于所有算子狀態分配每個Key對應一個狀態一個算子子任務對應一個狀態創建和訪問方式重寫RichFunction,通過里面的RuntimeContext訪問實現CheckpointedFunction等接口橫向擴展狀態隨著Key自動在多個算子子任務上遷移有多種狀態重新分配的方式支持的數據結構ValueState、ListState、MapState等ListState、BroadcastState等修改Flink應用的并行度:每個算子的并行算子子任務數發生了變化,整個應用需要關停和啟動一些算子子任務某份在原來某個算子子任務的狀態需要平滑更新到新的算子子任務上Flink的Checkpoint可以輔助狀態數據在算子子任務之間遷移算子子任務生成快照(Snapshot)保存到分布式存儲上子任務重啟后,相應的狀態在分布式存儲上重建(Restore)Keyed
State與Operator
State的橫向擴展方式稍有不同橫向擴展問題Flink提供了封裝好的數據結構供我們使用,包括ValueState、ListState等主要有:ValueState:單值MapState:Key-Value對ListState:列表ReducingState和AggregatingState:合并Keyed
State由于跟Key綁定,Key自動分布到不同算子子任務,Keyed
State也可以根據Key分發到不同算子子任務上Keyed
State實現RichFunction函數類,比如RichFlatMapFunction創建StateDescriptor,StateDescriptor描述狀態的名字和狀態的數據結構,每種類型的狀態有對應的StateDescriptor通過StateDescriptor,從RuntimeContext中獲取狀態調用狀態提供的方法,獲取狀態,更新數據Keyed
State//創建StateDescriptor
MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通過StateDescriptor獲取運行時上下文中的狀態
behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);MapState<UK,UV>:UVget(UKkey)voidput(UKkey,UVvalue)booleancontains(UKkey)…案例:統計電商用戶行為UserBehavior場景下,某個用戶(userId)下某種用戶行為(behavior)的數量Keyed
State/**
*MapStateFunction繼承并實現RichFlatMapFunction*兩個泛型分別為輸入數據類型和輸出數據類型*/
publicstaticclass
MapStateFunction
extends
RichFlatMapFunction<UserBehavior,Tuple3<Long,String,Integer>>{//指向MapState的句柄
privateMapState<String,Integer>behaviorMapState;@Overridepublicvoidopen(Configurationconfiguration){//創建StateDescriptor
MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通過StateDescriptor獲取運行時上下文中的狀態
behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);}@OverridepublicvoidflatMap(UserBehaviorinput,Collector<Tuple3<Long,String,Integer>>out)throwsException{intbehaviorCnt=1;//behavior有可能為pv、cart、fav、buy等
//判斷狀態中是否有該behavior
if(behaviorMapState.contains(input.behavior)){behaviorCnt=behaviorMapState.get(input.behavior)+1;}//更新狀態
behaviorMapState.put(input.behavior,behaviorCnt);out.collect(Tuple3.of(input.userId,input.behavior,behaviorCnt));}}使用MapState記錄某個behavior下的數量<behavior,
behaviorCnt>UserBehavior案例先基于userId進行keyBy再使用有狀態的MapStateFunction進行處理Keyed
Stateenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorStream=...//生成一個KeyedStream
KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上進行flatMap
DataStream<Tuple3<Long,String,Integer>>behaviorCountStream=keyedStream.flatMap(newMapStateFunction());狀態:算子子任務的本地數據在Checkpoint過程時寫入存儲,這個過程被稱為備份(Snapshot)初始化或重啟一個Flink作業時,以一定邏輯從存儲中讀出并變為算子子任務的本地數據,這個過程被稱為重建(Restore)Keyed
State開箱即用:數據劃分基于Key,Snapshot和Restore過程可以基于Key在多個算子子任務之間做數據遷移Operator
State每個算子子任務管理自己的狀態,流入到這個算子子任務上的所有數據可以訪問和修改Operator
State故障重啟后,數據流中某個元素不一定流入重啟前的算子子任務上需要根據具體業務場景設計Snapshot和Restore的邏輯使用CheckpointedFunction接口類Operator
StateFlink定期執行Checkpoint,會將狀態數據Snapshot到存儲上每次執行Snapshot,會調用snapshotState()方法,因此我們要實現一些Snapshot邏輯,比如將哪些狀態持久化initializeState()在算子子任務初始化狀態時調用,有兩種被調用的可能:整個Flink作業第一次執行,狀態數據需要初始化一個默認值Flink作業遇到故障重啟,基于之前已經持久化的狀態恢復ListState
/
UnionListStateBroadcastStateOperator
Statepublic
interface
CheckpointedFunction{//Checkpoint時會調用這個方法,我們要實現具體的snapshot邏輯,比如將哪些本地狀態持久化
void
snapshotState(FunctionSnapshotContextcontext)
throwsException;//初始化時會調用這個方法,向本地狀態中填充數據
void
initializeState(FunctionInitializationContextcontext)
throwsException;}CheckpointedFunction源碼狀態以列表的形式序列化并存儲單個狀態為S,每個算子子任務有零到多個狀態,共同組成一個列表ListState[S],Snapshot時將這些狀態以列表形式寫入存儲包含所有狀態的大列表,當作業重啟時,將這個大列表重新分布到各個算子子任務上ListState:將大列表按照Round-Ribon模式均勻分布到各個算子子任務上,每個算子子任務得到的是大列表的子集UnionListState:將大列表廣播給所有算子子任務應用場景:Source上保存流入數據的偏移量,Sink上對輸出數據做緩存Operator
State
–
ListState、UnionListStateOperator
State使用方法重點實現snapshotState()和initializeState()兩個方法在initializeState()方法里初始化并獲取狀態注冊StateDescriptor,指定狀態名字和數據類型從FunctionInitializationContext中獲取OperatorStateStore,進而獲取Operator
State在snapshotState()方法里實現一些業務邏輯基于ListState實現可緩存的Sink//重寫CheckpointedFunction中的snapshotState()
//將本地緩存Snapshot到存儲上
@OverridepublicvoidsnapshotState(FunctionSnapshotContextcontext)throwsException{//將之前的Checkpoint清理
checkpointedState.clear();for(Tuple2<String,Integer>element:bufferedElements){//將最新的數據寫到狀態中
checkpointedState.add(element);}}//重寫CheckpointedFunction中的initializeState()
//初始化狀態
@OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{//注冊ListStateDescriptor
ListStateDescriptor<Tuple2<String,Integer>>descriptor=newListStateDescriptor<>("buffered-elements",TypeInformation.of(newTypeHint<Tuple2<String,Integer>>(){}));//從FunctionInitializationContext中獲取OperatorStateStore,進而獲取ListState
checkpointedState=context.getOperatorStateStore().getListState(descriptor);//如果是作業重啟,讀取存儲中的狀態數據并填充到本地緩存中
if(context.isRestored()){for(Tuple2<String,Integer>element:checkpointedState.get()){bufferedElements.add(element);}}}Sink先將數據放在本地緩存中,并定期通過snapshotState()方法進行SnapshotinitializeState()初始化狀態,需判斷是新作業還是重啟作業snapshotState()initializeState()Broadcast可以將部分數據同步到所有實例上使用場景:一個主數據流,一個控制規則流,主數據流比較大,只能分散在多個算子實例上,控制規則流數據比較小,可以廣播分發到所有算子實例上。與Join的區別:控制規則流較小,可以放到每個算子實例里電商用戶行為分析案例:識別用戶行為模式,行為模式包括“反復猶豫下單類”、“頻繁爬取數據類”等,控制流里包含了這些行為模式,使用Flink實時識別用戶Broadcast
State主邏輯中讀取兩個數據流Broadcast
State支持Key-Value形式,需要使用MapStateDescriptor來構建再使用broadcast()方法將數據廣播到所有算子子任務上,得到BroadcastStream主數據流先進行keyBy(),然后與廣播流合并,在KeyedBroadcastProcessFunction中實現具體業務邏輯BroadcastPatternFunction是KeyedBroadcastProcessFunction的具體實現Broadcast
State//主數據流
DataStream<UserBehavior>userBehaviorStream=...//BehaviorPattern數據流
DataStream<BehaviorPattern>patternStream=...//BroadcastState只能使用Key->Value結構,基于MapStateDescriptor
MapStateDescriptor<Void,BehaviorPattern>broadcastStateDescriptor=newMapStateDescriptor<>("behaviorPattern",Types.VOID,Types.POJO(BehaviorPattern.class));BroadcastStream<BehaviorPattern>broadcastStream=patternStream.broadcast(broadcastStateDescriptor);//生成一個KeyedStream
KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上進行connect()和process()
DataStream<Tuple2<Long,BehaviorPattern>>matchedStream=keyedStream.connect(broadcastStream).process(newBroadcastPatternFunction());processElement()方法處理主數據流中的每條元素,輸出零到多個數據processBroadcastElement()方法處理廣播流,可以輸出零到多個數據,一般用來更新BroadcastStateKeyedBroadcastProcessFunction屬于ProcessFunction系列函數,可以注冊Timer,并在onTimer方法中實現回調邏輯。KeyedBroadcastProcessFunction實現有狀態的計算Checkpoint原理及配置方法Savepoint原理及使用方法Flink的狀態是基于本地的,本地狀態數據不可靠Checkpoint機制:Flink定期將狀態數據保存到存儲上,故障發生后將狀態數據恢復。快照(Snapshot)、分布式快照(DistributedSnapshot)和檢查點(Checkpoint)均指的是Flink將狀態寫入存儲的過程一個簡單的Checkpoint流程:暫停處理新流入數據,將新數據緩存下來將算子子任務的本地狀態數據拷貝到一個遠程的持久化存儲上繼續處理新流入的數據,包括剛才緩存起來的數據Checkpoint機制檢查點分界線(CheckpointBarrier)被插入到數據流中,將數據流切分成段。Flink的算子接收到CheckpointBarrier后,對狀態進行快照。每個CheckpointBarrier有一個ID,表示該段數據屬于哪次Checkpoint。當ID為n的CheckpointBarrier到達每個算子后,表示要對n-1和n之間狀態更新做快照。
Checkpoint
Barrier構建并行度為2的數據流圖Flink的檢查點協調器(CheckpointCoordinator)觸發一次Checkpoint(TriggerCheckpoint),這個請求會發送給Source的各個子任務。分布式快照流程各Source算子子任務接收到這個Checkpoint請求之后,會將自己的狀態寫入到狀態后端,生成一次快照向下游廣播CheckpointBarrier分布式快照流程Source算子做完快照后,還會給CheckpointCoodinator發送一個確認(ACK)ACK中包括了一些元數據,包括備份到State
Backend的狀態句柄(指向狀態的指針)Source算子完成了一次Checkpoint分布式快照流程對于下游算子來說,可能有多個與之相連的上游輸入。一個輸入被稱為一條通道。Id為n的Checkpoint
Barrier會被廣播到多個通道。不同通道的Checkpoint
Barrier傳播速度不同。需要進行對齊(BarrierAlignment)對齊分四步:1
.算子子任務在某個輸入通道中收到第一個ID為n的CheckpointBarrier,其他輸入通道中ID為n的CheckpointBarrier還未到達。2
.算子子任務將第一個輸入通道的數據緩存下來,同時繼續處理其他輸入通道的數據,這個過程被稱為對齊。3
.第二個輸入通道的CheckpointBarrier抵達該算子子任務,該算子子任務執行快照,將狀態寫入StateBackend,然后將ID為n的CheckpointBarrier向下游所有輸出通道廣播。4
.對于這個算子子任務,快照執行結束,繼續處理各個通道中新流入數據,包括剛才緩存起來的數據。Checkpoint
Barrier對齊每個算子都要執行一遍上述的對齊、快照、確認的工作最后的Sink算子發送確認后,說明ID為n的Checkpoint執行結束,CheckpointCoordinator向StateBackend寫入一些本次Checkpoint的元數據Checkpoint完成CheckpointBarrier對齊時,必須等待所有上游通道都處理完。假如某個上游通道處理很慢,這可能造成整個數據流堵塞。一個算子子任務不需要等待所有上游通道的CheckpointBarrier,直接將CheckpointBarrier廣播,算子子任務直接執行快照并繼續處理后續流入數據。Flink必須將那些上下游正在傳輸的數據也作為狀態保存到快照中。開啟Unaligned
Checkpoint:Unaligned
Checkpoint優缺點:不需要對齊,Checkpoint速度快傳輸數據也要快照,狀態數據大,磁盤負載加重,重啟后狀態恢復時間過長,運維管理難度大Unaligned
Checkpointenv.getCheckpointConfig().enableUnalignedCheckpoints();
每次執行數據快照時,不需要暫停新流入數據。Flink啟動一個后臺線程,它創建本地狀態的一份復制,這個線程用來將本地狀態的復制同步到StateBackend上,一旦數據同步完成,再給CheckpointCoordinator發送確認信息。該過程被稱為異步快照(AsynchronousSnapshot)。利用寫入時復制(Copy-on-Write):如果這份內存數據沒有任何修改,那沒必要生成一份復制,如果這份內存數據有一些更新,那再去申請額外的內存空間并維護兩份數據,一份是快照時的數據,一份是更新后的數據。是否開啟異步快照可配置。異步快照State
Backend用來持久化狀態數據Flink內置三種State
Backend:MemoryStateBackendFsStateBackendRocksDBStateBackendState
Backend基于內存,數據存儲在Java的堆區。進行分布式快照時,所有算子子任務會將自己內存上的狀態同步到JobManager的堆上,因此一個作業的所有狀態要小于JobManager的內存大小,否則將拋出OutOfMemoryError異常。只適合調試或者實驗,不建議在生產環境下使用。如果不做其他聲明,默認情況是使用這種模式作為StateBackend。設置使用內存作為State
Backend,MAX_MEM_STATE_SIZE為設置的狀態的最大值:MemoryStateBackendenv.setStateBackend(newMemoryStateBackend(MAX_MEM_STATE_SIZE));基于文件系統,數據最終持久化到文件系統上文件系統包括本地磁盤、HDFS、Amazon、阿里云等在內的云存儲服務,使用時需要提供文件系統的地址,寫明前綴:file://、hdfs://或s3://默認開啟異步快照本地的狀態在TaskManager的堆內存上,執行快照時狀態數據會寫到文件系統上FsStateBackend//使用HDFS作為StateBackend
env.setStateBackend(newFsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));//使用阿里云OSS作為StateBackend
env.setStateBackend(newFsStateBackend("oss://<your-bucket>/<object-name>"));//使用Amazon作為StateBackend
env.setStateBackend(newFsStateBackend("s3://<your-bucket>/<endpoint>"));//關閉AsynchronousSnapshot
env.setStateBackend(newFsStateBackend(checkpointPath,false));本地狀態存儲在本地RocksDB上,Checkpoint時將RocksDB數據再寫到遠程的存儲上,因此需要配置一個分布式存儲的地址。本地狀態基于RocksDB,可以突破內存空間的限制,可存儲的狀態量更大。但RocksDB需要序列化和反序列化,讀寫時間成本高。支持增量快照(IncrementalCheckpoint):只對發生變化的數據增量寫到分布式存儲上,而不是將所有的本地狀態都拷貝過去。非常適合超大規模的狀態。但重啟恢復的時間更長。需要手動開啟:RocksDBStateBackend//開啟IncrementalCheckpoint
booleanenableIncrementalCheckpointing=true;env.setStateBackend(newRocksDBStateBackend(checkpointPath,enableIncrementalCheckpointing));默認情況下,Checkpoint機制是關閉的,開啟:n表示每隔n毫秒進行一次CheckpointCheckpoint耗時可能比較長,n設置過小,有可能出現一次Checkpoint還沒完成,下次Checkpoint已經被觸發,n設置過大,如果重啟,整個作業需要從更長的Offset開始重新處理數據開啟Checkpoint,使用Checkpoint
Barrier對齊功能,可以提供Exactly-Once語義At-Least-Once語義:不使用Checkpoint
Barrier對齊功能,但某些數據可能被處理多次一些其他Checkpoint設置,在CheckpointConfig中設置:Checkpoint相關配置env.enableCheckpointing(n)//使用At-Least-OncecheckpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);CheckpointConfigcheckpointCfg=env.getCheckpointConfig();重啟恢復流程:重啟應用,在集群上重新部署數據流圖。從持久化存儲上讀取最近一次的Checkpoint數據,加載到各算子子任務上。繼續處理新流入的數據。作業故障重啟,會暫停一段時間,這段時間上游數據仍然會繼續發送過來,作業重啟后,需要消化這些未處理的數據。重啟恢復流程由于異常導致故障,異常根源不消除,重啟后仍然出現故障,因此要避免無限次重啟。固定延遲(FixedDelay):作業每次失敗后,按照設定的時間間隔進行重啟嘗試,重啟次數不會超過某個設定值失敗率(FailureRate):計算一個時間段內作業失敗的次數,如果失敗次數小于設定值,繼續重啟,否則不重啟不重啟(NoRestart)在conf/flink-conf.yaml設置或者在代碼中設置三種重啟策略實現有狀態的計算Checkpoint原理及配置方法Savepoint原理及使用方法Checkpoint和Savepoint生成的數據近乎一樣Checkpoint目的是為了故障重啟,使得重啟前后作業狀態一致Savepoint目的是手動備份數據,以便進行調試、遷移、迭代等狀態數據從零積累成本很高迭代:在初版代碼的基礎上,保留狀態到Savepoint中,方便修改業務邏輯遷移:把程序遷移到新的機房、集群等有計劃地備份、停機,手動管理和刪除狀態數據場景:同一個作業不斷調整并行度,以找到最優方案進行A/B實驗,使用相同的狀態數據測試不同的程序版本Savepoint與Checkpoint的區別每個算子應該分配一個唯一ID,Savepoint中的狀態數據以算子ID來存儲和區分不設置ID,Flink自動為其分配一個ID算子IDDataStream<X>stream=env.//一個帶有OperatorState的Source,例如KafkaSource
.addSource(newStatefulSource()).uid(“source-id”)
//算子ID
.keyBy(...)//一個帶有KeyedState的StatefulMap
.map(newStatefulMapper()).uid(“mapper-id”)
//算子ID
//print是一種無狀態的Sink
.print();//Flink為其自動分配一個算子ID對某個作業的狀態進行備份,將Savepoint目錄保存到某個目錄下:從某個Savepoint目錄中恢復一個作業:備份和恢復$
./bin/flinksavepoint<jobId>[savepointDirectory]$
./bin/flinkrun-s<savepointPath>[OPTIONS]<xxx.jar>StateProcessorAPI:基于DataSet
API,讀寫和修改Savepoint數據Savepoint以一定的Schema存儲,像讀寫數據庫一樣讀寫SavepointReaderFunction是一個KeyedStateReaderFunction的實現,需要實現open()和readKey()方法:open()方法中注冊StateDescriptorreadKey()方法中逐Key讀取數據,輸出到Collector中從Savepoint中讀數據Savepoint中的數據存儲形式DataSet<Integer>listState=savepoint.readListState<>("source-id","os1",Types.INT);//ReaderFunction需要繼承并實現KeyedStateReaderFunction
DataSet<KeyedState>keyedState=savepoint.readKeyedState("mapper-id",newReaderFunction());向Savepoint中寫入狀態,適合作業冷啟動構建BootstrapTransformation操作,是一個狀態寫入的過程,可以理解為流處理時使用的有狀態的算子withOperator()向Savepoint中添加算子,參數分別為:算子ID一個BootstrapTransformationKeyed
State和Operator
State的BootstrapTransformation實現不同向Savepoint寫數據ExecutionEnvironmentbEnv=ExecutionEnvironment.getExecutionEnvironment();//最大并行度
intmaxParallelism=128;StateBackendbackend=...//準備好寫入狀態的數據
DataSet<Account>accountDataSet=bEnv.fromCollection(accounts);//構建一個BootstrapTransformation,將accountDataSet寫入
BootstrapTransformation<Account>transformation=OperatorTransformation.bootstrapWith(accountDataSet).keyBy(acc->acc.id).transform(newAccountBootstrapper());//創建算子,算子ID為accountsSavepoint.create(backend,maxParallelism).withOperator("accounts",transformation).write(savepointPath);bEnv.execute("bootstrap");Operator
State要實現StateBootstrapFunction實現processElement()方法,每來一個輸入,processElement()方法會被調用一次,用于將數據寫入Savepoint。Operator
State:StateBootstrapFunction/**
*繼承并實現StateBootstrapFunction
*泛型參數為輸入類型*/
public
class
SimpleBootstrapFunction
extends
StateBootstrapFunction<Integer>{privateListState<Integer>state;//每個輸入都會調用一次processElement,這里將輸入加入到狀態中
@Overridepublic
void
processElement(Integervalue,Contextctx)
throwsException{state.add(value);}@Overridepublic
void
snapshotState(FunctionSnapshotContextcontext)
throwsException{}//獲取狀態句柄
@Overridepublic
void
initializeState(FunctionInitializationContextcontext)
throwsException{state=context.getOperatorState().getListState(newListStateDescriptor<>("state",Types.INT));}}KeyedState要實現KeyedStateBootstrapFunction實現processElement()方法,每來一個輸入,processElement()方法會被調用一次,用于將數據寫入Savepoint。KeyedState:KeyedStateBootstrapFunction/**
*AccountBootstrapper繼承并實現了KeyedStateBootstrapFunction
*第一個泛型Integer為Key類型*第二個泛型Account為輸入類型*/
public
class
AccountBootstrapper
extends
KeyedStateBootstrapFunction<Integer,Account>{ValueState<Double>state;//獲取狀態句柄
@Overridepublic
void
open(Configurationparameters)
{ValueStateDescriptor<Double>descriptor=new
ValueStateDescriptor<>("total",Types.DOUBLE);state=getRuntimeContext().getState(descriptor);}//每個輸入都會調用一次processElement()@Overridepublic
void
processElement(Accountvalue,Contextctx)
throwsException{state.update(value.amount);}}從已有的Savepoint上修改,保存。removeOperator()將一個算子狀態數據從Savepoint中刪除。withOperator()方法增加了一個算子。write()方法將數據寫入一個路徑下。修改SavepointBootstrapTransformation<Integer>transformation=OperatorTransformation.bootstrapWith(data).transform(newModifyProcessFunction());Savepoint.load(bEnv,savepointPath,backend)//刪除名為currency的算子
.removeOperator("currency")//增加名為numbers的算子,使用transformation構建其狀態數據
.withOperator("number",transformation)//新的Savepoint會寫到modifyPath路徑下
.write(modifyPath);本章中我們和讀者介紹了,如何進行有狀態的計算。Flink中的狀態主要包括:KeyedState和OperatorState。狀態可以借助Checkpoint或Savepoint機制被持久化保存到存儲空間上,Checkpoint用于故障恢復,Savepoint用于狀態的迭代更新。7.Flink連接器本章將詳細介紹Flink的Connector相關知識。在實際生產環境中,數據可能存放在不同的系統中,比如文件系統、數據庫或消息隊列。一個完整的Flink作業包括Source和Sink兩大模塊,Source和Sink肩負著Flink與外部系統進行數據交互的重要功能,它們又被稱為連接器(Connector)。通過本節學習您將可以:掌握Flink端到端的Exactly-Once保障。掌握自定義Source和Sink。熟悉Flink中常用的Connector,如文件系統、Kafka等。端到端的Exactly-Once自定義Source和Sink常用流式連接器
端到端Exactly-OnceExactly-Once:某條數據投遞到某個流處理系統后,該系統對這條數據只處理一次有數據重發(Replay)問題:作業重啟后,Source必須從某個Offset位置重新發送數據數據重發會導致一條輸入數據可能多次影響下游系統,有可能產生At-Least-Once的效果,沒有達到Exactly-Once的效果為了達到端到端的Exactly-Once,必須:Source有重發功能Sink支持冪等寫或事務寫冪等寫(IdempotentWrite):任意多次向一個系統寫入數據,只對目標系統產生一次結果影響:重復向一個HashMap里插入同一個Key-Value對,第一次插入時這個HashMap發生變化,后續的插入操作不會改變HashMap的結果。Key-Value必須是可確定性(Deterministic)計算的:Key為name
+
curTimestamp,curTimestamp一直變化,Key非可確定性Key為name+eventTimestamp,Event
Time確定,Key可確定性有短暫的數據閃回現象:只有當后續所有數據都重發一遍后,所有應該被覆蓋的Key都被最新數據覆蓋后,整個系統才達到數據的一致狀態。冪等寫事務寫(TransactionWrite):Flink先將待輸出的數據保存下來暫時不向外部系統提交,等待Checkpoint結束的時刻,Flink上下游所有算子的數據都是一致時,將之前保存的數據全部提交(Commit)到外部系統:預寫日志(Write-Ahead-Log,WAL)兩階段提交(Two-Phase-Commit,2PC)Write-Ahead-Log方式使用OperatorState緩存待輸出的數據Two-Phase-Commit方式需要外部系統自身就支持事務(比如Kafka)端到端的Exactly-Once,犧牲了低延遲,數據分批次地提交事物寫端到端的Exactly-Once自定義Source和Sink常用流式連接器Flink在1.11對Source進行了重構,改動較大,之前的稱為老Source接口,之后的稱為新Source接口老Source接口實現SourceFunction:接口類SourceFunctionRich函數類RichSourceFunction必須實現兩個方法:run()和cancel()方法:run()方法:Source啟動后開始運行,在方法中使用循環,循環內不斷向下游發送數據cancel()方法:停止向下游繼續發送數據老Source接口//Source啟動后調用run方法,生成數據向下游發送
void
run(SourceContext<T>ctx)
throwsException;//停止
void
cancel();使用標志位isRunning標記Source是否在運行run()方法內一直循環,使用SourceContext.collect()方法收集數據,發送到下游停止Source時,要修改標志位isRunning主邏輯中調用:老Source接口private
static
class
SimpleSource
implements
SourceFunction<Tuple2<String,Integer>>{private
intoffset=0;private
booleanisRunning=true;@Overridepublic
void
run(SourceContext<Tuple2<String,Integer>>ctx)
throwsException{while(isRunning){Thread.sleep(500);ctx.collect(newTuple2<>(""+offset,offset));offset++;if(offset==1000){isRunning=false;}}}@Overridepublic
void
cancel()
{isRunning=false;}}自定義Source:將數字發送到下游DataStream<Tuple2<String,Integer>>countStream=env.addSource(newSimpleSource());前頁的例子沒有進行任何Checkpoint,重啟后從0重新開始,為了整個作業重啟后可恢復,Source需要支持重發,將Offset作為狀態記錄下來使用Operator
State記錄Offset,需要繼承CheckpointedFunction接口類,實現snapshotState()和initializeState()方法整個作業第一次啟動時,調用initializeState()方法,offset為0,之后每隔一段時間調用snapshotState()將狀態數據進行Checkpoint可恢復的Source@Overridepublic
void
snapshotState(FunctionSnapshotContextsnapshotContext)
throwsException{//清除上次狀態
offsetState.clear();//將最新的offset添加到狀態中
offsetState.add(offset);}@Overridepublic
void
initializeState(FunctionInitializationContextinitializationContext)
throwsException{//初始化offsetState
ListStateDescriptor<Integer>desc=newListStateDescriptor<Integer>("offset",Types.INT);offsetState=initializationContext.getOperatorStateStore().getListState(desc);Iterable<Integer>iter=offsetState.get();if(iter==null||!iter.iterator().hasNext()){//第一次初始化,從0開始計數
offset=0;}else{//從狀態中恢復offset
offset=iter.iterator().next();}}privateListState<Integer>offsetState;在Source發送數據時也設置數據對應的時間戳,并生成Watermark:collectWithTimestamp()方法,發送數據的同時也設置時間戳emitWatermark()方法,生成Watermark越早設置時間戳和Watermark,越能保證整個作業在時間序列上的準確性和健壯性時間戳和Watermark@Overridepublic
void
run(SourceContext<Tuple2<String,Integer>>ctx)
throwsException{while(isRunning){ Thread.sleep(100);//將系統當前時間作為該條數據的時間戳發送出去
ctx.collectWithTimestamp( newTuple2<>(""+offset,offset),System.currentTimeMillis());offset++;//每隔一段時間,發送一個Watermark
if(offset%100==0){ctx.emitWatermark(newWatermark(System.currentTimeMillis()));}if(offset==1000){isRunning=false;}}}老Source接口只適合流處理,不適合批處理,新的Source接口統一了流批處理,提供了更大規模并行處理能力三個組件:分片(Split):將數據源切分后的一小部分。讀取器(SourceReader):在TaskManager上,負責Split的讀取和處理,可分布式地并行運行。例如,單個SourceReader可以讀取文件夾里的單個文件,多個SourceReader實例共同完成讀取整個文件夾的任務。分片枚舉器(SplitEnumerator):在JobManager上,負責發現和分配Split,按照負載均衡策略將多個Split分配到多個SourceReader。新Source接口類SinkFunctionRich函數類RichSinkFunction實現invoke()方法如果想達到端到端Exactly-Once,需要實現冪等寫和事務寫冪等寫:使用一些Key-Value存儲,并設計好Key,采用更新插入(Upsert)方式,將舊數據覆蓋事務寫:Write-Ahead-Log、Two-Phase-Commit
自定義Sink//每條數據到達Sink后都會調用invoke方法,發送到下游外部系統
//value為待輸出數據
void
invoke(INvalue,Contextcontext)在數據寫入到下游系統之前,先把數據以日志(Log)的形式緩存下來,等收到明確的確認提交信息后,再將Log中的數據提交到下游系統0、兩次Checkpoint之間的待輸出數據組成一個批次,待輸出批次緩存在Sink的Operator
State中1、接收到新的CheckpointBarrier
2、開啟一個新待輸出批次3、Sink向CheckpointCommitter查詢某批次是否已經提交。CheckpointCommitter是一個與外部系統緊密相連的插件,里面存儲了各批次數據是否已經寫入到外部系統4、Sink得知某批次數據還未提交,則使用sendValues()方法,發送待輸出的數據到外部系統5、提交成功后,Sink會刪除OperatorState中存儲的這些數據Write-Ahead-Log待輸出數據直接寫入外部系統,與外部系統一起協作提供事物寫功能0、Sink直接將待發送數據寫到外部系統的第k次事務(Transaction)中1、接收到新的CheckpointBarrier
2、preCommit()將第k次Transaction的數據預提交到外部系統中,數據寫到外部系統,但是并未確認,外部系統也不可見3、beginTransaction()方法,開啟下一次Transaction(Transactionk+1),在這之后的上游算子流入的待輸入數據都將流入新的Transaction(k+1)4、第2步和第3步完成后,執行commit()方法,確認提交Transaction
k,該批次數據在外部可見Two-Phase-Commit端到端的Exactly-Once自定義Source和Sink常用流式連接器內置I/O(Input/Output)接口flink-connector項目所涉及的ConnectorApacheBahir所提供的Connector
系統類型:消息隊列、數據庫、文件系統具體技術:Kafka、Elasticsearch、HBase、Cassandra、JDBC、Kinesis、Redis
…常用流式連接器基于Socket的Source和Sink無法實現數據重發,適合用來調試基于內存集合的Source打印到標準輸出的Sinkprint()打印到STDOUTprintToErr()打印到STDERR數據類型要實現toString()方法實際是在TaskManager上執行內置I/O接口//讀取Socket中的數據,數據流元素之間用\n來切分
env.socketTextStream(hostname,port,"\n");//向Socket中寫數據,數據以SimpleStringSchema序列化
stream.writeToSocket(outputHost,outputPort,newSimpleStringSchema());DataStream<Integer>sourceDataStream=env.fromElements(1,2,3);從內存集合讀取數據從Socket中讀取數據通過文件系統描述符來確定使用什么文件系統:hdfs://、s3://周期性檢測功能:每隔一定時間周期性地檢查filePath路徑下的內容是否有更新基于文件系統的SourceStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StringtextPath=...//以UTF-8編碼格式讀取文件
DataStream<String>text=env.readTextFile(textPath)//文件路徑
StringfilePath=...//文件為純文本格式
TextInputFormattextInputFormat=newTextInputFormat(neworg.apache.flink.core.fs.Path(filePath));//每隔100毫秒檢測一遍
DataStream<String>inputStream=env.readFile(textInputFormat,filePath, FileProcessingMode.PROCESS_CONTINUOUSLY,100);簡單接口復雜接口writeAsText()方法:無法進行Checkpoint,逐漸被廢棄StreamingFileSink行式存儲和列式存儲桶:輸出路徑的子文件夾可以按時間分桶基于文件系統的SinkDataStream<Address>stream=env.addSource(...)//使用StreamingFileSink將DataStream輸出為一個文本文件
StreamingFileSink<String>fileSink=StreamingFileSink.forRowFormat(newPath("/file/base/path"),
newSimpleStringEncoder<String>("UTF-8")).build();stream.addSink(fileSink);[base-path]/[bucket-path]/part-[task-id]-[id]/file/base/path└──2020-02-25--15├──part-0-0.inprogress.92c7be6f-8cfc-4ca3-905b-91b0e20ba9a9├──part-1-0.inprogress.18f9fa71-1525-4776-a7bc-fe02ee1f2ddaStreamingFileSink接口桶的文件夾結構Kafka:被廣泛使用的消息隊列,非常具有代表性可以作為Flink的上游,此時要構建Flink的Source;也可以作為Flink的下游,此時要構建Flink的Sink不在Flink核心程序中,使用時需要額外在Maven中添加依賴Flink
Kafka
Connector<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.11.0</version></dependency>Kafka是一個Producer,Flink作為Kafka的Consumer消費Kafka中的數據創建FlinkKafkaConsumer需要三個參數:Topic、反序列化方式和Kafka相關參數Kafka中傳輸的是二進制的數據,需要提供一個反序列化方式,將數據轉化為具體的Java或Scala對象開啟Flink
Checkpoint后,Checkpoint會記錄Offset,以進行故障恢復Flink
Kafka
Source//Kafka參數
Propertiesproperties=newProperties();properties.setProperty("bootstrap.servers","localhost:9092");properties.setProperty("group.id","flink-group");StringinputTopic="Shakespeare";//Source
FlinkKafkaConsumer<String>consumer=newFlinkKafkaConsumer<String>(inputTopic,newSimpleStringSchema(),properties);DataStream<String>stream=env.addSource(consumer);Flink是Producer,向Kafka輸出數據創建FlinkKafkaProducer需要四個參數:Topic、序列化方式、Kafka相關參數以及投遞保障序列化方式將Java/Scala對象轉化為Kafka中的二進制數據三種投遞保障:NONE:不提供任何保障,數據可能會丟失也可能會重復。AT_LEAST_ONCE:保證不丟數據,但是有可能會有重復。EXACTLY_ONCE:基于Kafka提供的事務寫功能,一條數據最終只寫入Kafka一次。FlinkKafkaSinkDataStream<Tuple2<String,Integer>>wordCount=...FlinkKafkaProducer<Tuple2<String,Integer>>producer=newFlinkKafkaProducer<Tuple2<String,Integer>>(outputTopic,newKafkaWordCountSerializationSchema(outputTopic),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年養老機構醫養結合運營模式創新與可持續發展報告001
- 紡織服裝行業智能化生產對企業管理創新研究報告
- 天然植物精油護膚品牌在2025年銷售渠道拓展研究報告
- 2025年元宇宙社交平臺虛擬現實教育游戲化市場前景:用戶體驗與教育效果報告
- 2025年醫院電子病歷系統在醫院信息化中的數據加密優化報告
- 2025年工業互聯網平臺網絡隔離技術:工業互聯網安全防護市場前景分析報告001
- 2025年醫藥行業CRO模式下的臨床試驗倫理審查與合規報告
- 新一代大學英語(第二版)綜合教程1-U4-教師用書 Unit 4 Life at your fingertips
- 2025年醫藥流通企業供應鏈優化與成本控制智能供應鏈管理供應鏈管理采購管理優化報告
- 保險競賽題庫及答案詳解
- 自然照護理念體位管理
- 《關稅政策解析》課件
- 武漢網約車從業資格證考試題庫及答案
- 鋁粉交易居間協議合同
- 耐高溫有機硅樹脂合成及改性技術
- 竹編非遺面試題及答案
- 國家開放大學漢語言文學本科《中國現代文學專題》期末紙質考試第三大題分析題庫2025春期版
- 離婚協議書 標準版電子版(2025年版)
- 2024北京市昌平區中考真題生物+答案
- 手術室醫療垃圾的分類
- 教育領域中的信息化技術討論以小學數為例
評論
0/150
提交評論