




下載本文檔
版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
概要依賴管理基本套路輸入源轉換操作輸出操作持久化操作依賴管理依賴<dependency><groupId>
.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.2</version></dependency>Source相關依賴部分source相關依賴現在已經單獨打包,需要單獨引入基本套路//1、參數處理if
(args.length
<
2)
{System.err.println("Usage:
NetworkWordCount
<hostname>
<port>")System.exit(1)}//2、初始化StreamingContextval
sparkConf
=
new
SparkConf().setAppName("NetworkWordCount")val
ssc
=
new
StreamingContext(sparkConf,
Seconds(1))//3、從source獲取數據創建DStreamval
lines
=
ssc.socketTextStream(args(0),args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)//4、對DStream進行val
words
=
lines.flatMap(_.split("
"))val
wordCounts=
words.map(x
=>
(x,
1)).reduceByKey(_+
_)//5、處理計算結果
wordCounts.print()//6、啟動Spark
Streamingssc.start()ssc.awaitTermination()輸入源Dstream輸入源---input
DStreamSpark內置了兩類Source:Source分類舉例說明Basic
sourcesfile
systems,
socketconnections,
and
AkkaactorsStreamingContext
直接就可以創建,無需引入額外的依賴Advanced
sourcesKafka,
Flume,Kinesis,,
etc需要引入相關依賴,并且需要通過相關的工具類來創建val
lines
=
ssc.socketTextStream(args(0),args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)import
.apache.spark.streaming.kafka._val
kafkaStream
=
KafkaUtils.createStream(streamingContext,
[ZK
quorum],
[consumer
group
id],
[per-topic
number
of
Kafka
partitions
to
consume])Dstream輸入源---Receiverinput
Dstream都會關聯一個Receiver(除了FileInputDStream)Receiver以任務的形式運行在應用的執行器進程中,從輸入源收集數據并保存為RDD。Receiver收集到輸入數據后會把數據
到另一個執行器進程來保障容錯性(默認行為)Receiver會消耗額外的cpu資源,所以要注意分配
的cpu
cores(receiver是一個單獨的task,會消耗cpu)local模式下不要“local”or
“local[1]”(需要指定多個,只有一個核會卡?。┓植际竭\行時,分配的cores >
receivers的數量StreamingContext
會周期性地運行Spark
作業來處理這些數據(每接受一批次數據,就會提交作業運行處理),把數據與之前時間區間中的RDD進行整合(如果是時間窗口,需要與其它RDD做運算整合)內置的input
Dstream:Basic
Sources內置input
Dstream–
/apache/spark/tree/v1.6.2/external(高級)文件流val
logData
=
ssc.textFileStream(logDirectory)Spark
支持從任意Hadoop
兼容的文件系統中
數據,
Spark
Streaming
也就支持從任意Hadoop
兼容的文件系統
中的文件創建數據流(InputFormat參數化)ssc.fileStream[LongWritable,
IntWritable,SequenceFileInputFormat[LongWritable,
IntWritable]](inputDirectory).map
{case
(x,
y)
=>
(x.get(),
y.get())}文件必須原子化創建(比如把文件移入Spark
的
,而不是一條條往已有文件寫數據)Akka
actor流(spark
底層使用akka通信)內置的input
Dstream:Advanced
SourcesApache
Kafkadef
main(args:
Array[String])
{if
(args.length
<
4)
{System.err.println("Usage:KafkaWordCount
<zkQuorum><group>
<topics>
<numThreads>")System.exit(1)}#使用Array接受args參數val
Array(zkQuorum,
group,
topics,
numThreads)
=
argsval
sparkConf
=
new
SparkConf().setAppName("KafkaWordCount")val
ssc
=
new
StreamingContext(sparkConf,
Seconds(2))#指定hdfs
,作用:容錯ssc.checkpoint("checkpoint")val
topicMap
=
topics.split(",").map((_,
numThreads.toInt)).toMapval
lines
=
KafkaUtils.createStream(ssc,
zkQuorum,
group,
topicMap).map(_._2)val
words
=
lines.flatMap(_.split("
"))val
wordCounts
=
words.map(x
=>
(x,
1L)).reduceByKeyAndWindow(_
+
_,_
-
_,Minutes(10),
Seconds(2),
2)wordCounts.print()ssc.start()ssc.awaitTermination()}/apache/spark/examples/st
/apache/spark/tree/v1.6.2/examples/src/main/scala/
reamingDstream輸入源:multiple
input
DStreammultiple
input
streams(same
type
and
same
slideduration)//相同的類型,相同的滑動窗口ssc.union(Seq(stream1,stream2,…))
//合并多個streamstream.union(otherStream)//兩個stream進行合并Dstream輸入源:Custom
ReceiverCustom
input
Dstream–
.apache.spark.streaming.receiver.Receiver(只需要擴展Receiver)–無狀態轉換操作和Sparkcore的語義?一致無狀態轉化操作就是把簡單的RDD
轉化操作應用到每個批次上,也就是轉化DStream中的每一個RDD(對Dstream的操作會
到每個批次的RDD上)無狀態轉換操作不會跨多個batch的RDD去執行(每個批次的RDD結果不能累加)有狀態轉換操作1-updateStateByKey有時 需要在DStream
中跨所有批次狀態(例如用戶的會話)。針對這種情況,updateStateByKey()
為 提供了對一個狀態變量的 ,用于鍵值對形式的Dstream使用updateStateByKey需要完成兩步工作:定義狀態:可以是任意數據類型定義狀態更新函數-updateFuncupdate(events,
oldState)events:是在當前批次中收到的事件的列表(可能為空)。–oldState:是一個可選的狀態對象,存放在Option
內;如果一個鍵沒有之前的狀態,這個值可以空缺。newState:由函數返回,也以Option
形式存在;
可以返回一個空的Option
來表示想要刪除該狀態。注意:有狀態轉化操作需要在你的StreamingContext
中打開檢查點機制來確保容錯性–
ssc.checkpoint("hdfs://...")有狀態轉換操作2-window基于窗口的操作會在一個比StreamingContext
的批次間隔更長的時間范圍內,通過整合多個批次的結果,計算出整個窗口的結果所有基于窗口的操作都需要兩個參數,分別為windowDuration以及slideDuration,兩者都必須是StreamContext
的批次間隔的整數倍valaccessLogsWindow
=
accessLogsDStream.window(Seconds(30),
Seconds(10))val
windowCounts
=
accessLogsWindow.count()batchDuration(每個批次的長度)val
ssc
=
new
StreamingContext(sparkConf,
Seconds(10))windowDuration(每次移動,窗口框住的長度(幾個批次))長控制每次計算最近的多少個批次的數據(windowDuration/batchDuration)slideDuration(每次移動的距離(
幾個批次))默認值與batchDuration相等(默認滑動一個batch)控制多長時間計算一次有狀態轉換操作2-window操作代碼片段val
ssc
=
new
StreamingContext(sparkConf,
Seconds(10))…val
accessLogsWindow=
accessLogsDStream.window(Seconds(30),
Seconds(20))val
windowCounts
=
accessLogsWindow.count()..窗口時長為3個批次,滑動步長為2個批次;每隔2個批次就對前3
個批次的數據進行一次計算有狀態轉換操作2-window操作—普通規約與增量規約增量規約只考慮新進入窗口的數據和離開窗口的數據,讓Spark增量計算歸約結果。這種特殊形式需要提供歸約函數的一個逆函數,比如+對應的逆函數為-有狀態轉換操作2-window操作—理解增量規約DStream輸出常見輸出操作print每個批次中抓取DStream
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2019-2025年中國牛膝市場運行態勢及行業發展前景預測報告
- 中國汽車傳動軸行業市場運營現狀及投資規劃研究建議報告
- 開關控制裝備項目投資可行性研究分析報告(2024-2030版)
- 2025年 常寧市市級機關遴選考試筆試試題附答案
- 漁船避風港口航道疏浚采砂工程項目可行性研究報告
- 中國樓宇工程市場競爭態勢及行業投資潛力預測報告
- 中國晶圓制造設備市場全面調研及行業投資潛力預測報告
- 中國水產品干腌加工行業市場調查研究及投資前景預測報告
- 玻璃調棒行業深度研究分析報告(2024-2030版)
- 中國高密度聚乙烯薄膜行業調查報告
- 三生事業六大價值
- 鋯石基本特征及地質應用
- 絲網除沫器小計算
- 制缽機的設計(機械CAD圖紙)
- 學校財務管理制度
- 三年級下冊美術課件-第15課色彩拼貼畫|湘美版(共11張PPT)
- 水稻病蟲統防統治工作總結
- 水在不同溫度下的折射率、粘度和介電常數
- howdoyoucometoschoolPPT課件
- 四柱特高弟子班絕密資料——席學易
- 廣安市教育局文件材料歸檔范圍及保管期限表
評論
0/150
提交評論