




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
第五章數據采集與預處理科技大學軟件學院目錄2流數據采集工具Flume數據傳輸工具Sqoop數據接入工具Kafka流數據采集工具Flume3數據流:數據流通常被視為一個隨時間延續而無限增長地動態數據集合,是一組順序,大量,快速,連續到達地數據序列。通過對流數據處理,可以行衛星云圖監測,股市走向分析,網絡判斷,傳感器實時信號分析。ApacheFlume是一種分布式,具有高可靠與高可用地數據采集系統,可從多個不同類型,不同來源地數據流匯集到集式數據存儲系統。流數據采集工具Flume4圖給出Flume地一個應用場景。用戶使用Flume可以從云端,社網絡,網站等獲取數據,存儲在HDFS,HBase,供后期處理與分析。理解Flume地工作機制,需要了解,代理,源,通道,接收器等關鍵術語。流數據采集工具Flume5一,Flume在Flume,數據是以為載體行傳輸地。Flume被定義為具有字節有效載荷地體與可選地一組字符串屬頭地數據流單元。下圖為一個地示意圖,Header部分可以包括時間戳,源IP地址等鍵值對,可以用于路由判斷或傳遞其它結構化信息等。體是一個字節數組,包含實際地負載,如果輸入由日志文件組成,那么該數組就類似于一個單行文本地UTF-八編碼地字符串。流數據采集工具Flume6二,Flume代理一個Flume代理是一個JVM程,它是承載從外部源流向下一個目地地組件,主要包括源(Source),通道(Channel),槽/接收器(Sink)與其上流動地。流數據采集工具Flume7三,源Flume消費由外部源(如Web服務器)傳遞給它地。外部源以Flume源識別地格式向Flume發送。流數據采集工具Flume8四,通道在每個代理程序地通道暫存,并傳遞到下一個代理或終端存儲庫(如HDFS)。只有在存儲到下一代理程序地通道或終端存儲庫之后才被從通道刪除。一個代理可以有多個通道,多個接收器。Flume支持文件通道與內存通道。文件通道由本地文件系統支持,提供通道地可持久化解決方案;內存通道將簡單地存儲在內存地隊列,速度快,但若由于故障,保留在內存通道,將無法恢復。流數據采集工具Flume9五,槽/接收器Flume代理地輸出數據部分稱為槽(Sink)或接收器,負責從通道接受數據,并可傳遞到另外一個通道。接收器只可以從一個通道里接收數據。如圖五.四所示地Flume代理a一與a二地Avro接收器從內存通道接受數據,并傳遞給Flume代理b地Avro源,形成多級Flume。Flume地安裝10(一)解壓并修改名字(二)配置環境變量,修改vi/etc/profile文件,添加環境變量(三)運行flume-ngversionFlume地配置與運行11安裝好Flume后,使用Flume地步驟分為如下兩步:(一)在配置文件描述Source,Channel與Sink地具體實現;(二)運行一個Agent實例,在運行Agent實例地過程會讀取配置文件地內容,這樣Flume就會采集到數據。Flume地配置與運行12使用Flume監聽指定文件目錄地變化,并通過將信息寫入logger接收器地示例。其關鍵是通過配置一個配置文件,將數據源s一指定為spooldir類型,將數據槽/接收器k一指定為logger,配置一個通道k一,并指定s一地下游單元與k一地上游單元均為c一,實現Source->Channel->Sink地傳送通道。Flume地配置與運行13具體步驟如下:(一)首先入/flume-一.八.零/conf目錄下,創建Flume配置文件my.conf。(二)從整體上描述代理Agent地Sources,Sinks,Channels所涉及地組件。(三)具體指定代理a一地Source,Sink與Channel地屬特征。(四)通過通道c一將源r一與槽k一連接起來。(五)啟動FlumeAgent,編輯完畢myFlume.conf。(六)寫入日志文件,在testFlume.log文件寫入HelloWorld,作為測試內容,然后將文件復制到Flume地監聽路徑上。(七)當數據寫入監聽路徑后,在控制臺上就會顯示監聽目錄收集到地數據Flume源14一.Exec源Exec源在啟動時運行Unix命令,并且期望它會不斷地在標準輸出產生數據。Exec源可以實時搜集數據,但是在Flume不運行或者Shell命令出錯地情況下,數據將會丟失。二.Spool目錄源Spool目錄源允許將要收集地數據放置到"自動搜集"目錄,通過監視該目錄,解析新文件地出現。處理邏輯是可插拔地,當一個文件被完全讀入通道,Flune會重命名為以PLETED為擴展名地文件,或通過配置立即刪除該文件。Flume源15三.Avro源通過配置Avro源,指定Avro監聽端口,從外部Avro客戶端接受流。Avro源可以與Flume內置地Avro槽結合,實現更緊密地多級代理機制。四.CatTCP源一個CatTCP源用來監聽一個指定端口,并將接收到地數據地每一行轉換為一個。需要配置地屬跟Avro源類似,包括Channels,type,bind與port。Flume源16五.SyslogTCP源Syslog是一種用來在互聯網協議(TCP/IP)地網絡傳遞記錄檔信息地標準,Flumesyslog源包括UDP,TCP與多端口TCP源三種。在傳遞消息地負載較小地情況下,可以選擇UDP源,否則應選擇TCP或多端口TCP源。Syslog源需要設置地屬有Channels,host,port(多端口TCP源為ports)。Flume槽17一.FileRollSink在本地文件系統存儲。每隔指定時長生成文件,并保存這段時間內收集到地日志信息。必要屬包括type,directory;間隔時間使用rollInterval屬。二.AvroSinkAvroSink在實現Flume分層數據采集系統有重要作用,是實現多級流動,一∶N出流與N∶一入流地基礎。可以使用AvroRPC實現多個Flume節點地連接,將入Avro槽地轉換為Avro形式地,并送到配置好地主機端口。其,必要屬包括type,hostname與port。Flume槽18三.HDFSSinkHDFSSink將寫到Hadoop分布式文件系統HDFS,當前支持創建文本與序列化文件,并支持文件壓縮。這些文件可以依據指定地時間,數據量或數量行分卷,且通過類似時間戳或機器屬對數據行分區(Buckets/Partitions)操作。通道,攔截器與處理器19一.通道在Flume代理,通道是位于Flume源與槽之間,為流動地提供緩存地一個間區域,是暫存地地方,源負責往通道添加,槽負責從通道移出,其提供了多種可供選擇地通道,如MemoryChannel,FileChannel,JDBCChannel,PsuedoTransactionChannel。通道,攔截器與處理器20二.攔截器攔截器(Interceptor)是簡單插件式組件,設置在源與通道之間,源接收到在寫入到對應地通道之前,可以通過調用地攔截器轉換或者刪除過濾掉一部分。通道,攔截器與處理器21三.處理器為了在數據處理管道消除單點失敗,Flume提供了通過負載均衡以及故障恢復機制將發送到不同槽地能力,這里需要引入一個邏輯概念Sinkgroups(Sink組),用于創建邏輯槽分組,該行為由槽處理器來控制,決定了地路由方式。目錄22流數據采集工具Flume數據傳輸工具Sqoop數據接入工具Kafka數據傳輸工具Sqoop23ApacheSqoop是一個開源地數據庫導入/導出工具,允許用戶將關系型數據庫地數據導入Hadoop地HDFS文件系統,或將數據從Hadoop導入到關系型數據庫。Sqoop整合了Hive,Hbase與Oozie,通過MapReduce任務來傳輸數據,具有高并發與高可靠地特點。Sqoop地安裝24在安裝Sqoop之前,請確保已經安裝了JDK與Hadoop。從官網下載地址下載Sqoop一.九九.七版本Sqoop。(一)安裝前環境檢測,查看JDK與Hadoop版本。(二)Sqoop官網下載,解壓縮到local目錄(三)入到解壓縮目錄,創建兩個有關目錄(四)配置環境變量并使之生效Sqoop地配置與運行25(一)配置perties文件,指定Hadoop地安裝路徑(二)在conf目錄下,添加perties文件,加入本機Hadoop有關地jar文件路徑(三)Sqoop二地運行模式不再是Sqoop一地一個小工具,而是加入了服務器,這樣只要能訪問到MapReduce配置文件及其開發包,Sqoop服務器部署在哪里都無所謂,而客戶端Shell是不需要任何配置地,可直接使用。(四)啟動sqoop二客戶端Sqoop實例26本實例主要講解如何從MySQL數據庫導出數據到HDFS文件系統。從MySQL官網下載JDBC驅動壓縮包,并解壓其地jar包文件,到Sqoop地server/lib與shell/lib目錄下。(一)登陸Hadoop臺,入MySQL數據庫,新建數據庫test,新建表user(name,age),添加兩條數據到user表。(二)入sqoop-一.九九.七-bin-hadoop二零零/bin目錄Sqoop實例27(三)連接服務器,配置參數如表所示。Sqoop實例28(四)Sqoop二導入數據需要建立兩條鏈接,一條鏈接到關系型數據庫,另一條鏈接到HDFS。而每一條鏈接都要基于一個Connector。可以通過如下命令查看Sqoop二服務已存在地Connector:sqoop:零零零>showconnectorSqoop實例29(五)創建MySQL鏈接,Sqoop二默認提供了支持JDBC地connector,執行:sqoop:零零零>createlink-connectorgeneric-jdbc-connector執行以上命令會入到一個互界面,依次配置表五.二地信息。Sqoop實例30(六)創建HDFS鏈接,Sqoop二默認提供了支持HDFS地connector,執行:sqoop:零零零>createlink-connectorhdfs-connector執行以上命令會入互界面,依次配置下表地信息。Sqoop實例31(七)創建Sqoop地job提到MapReduce框架臺運行,執行:sqoop:零零零>createjob–fname一–tname二Sqoop實例32(八)啟動job,執行如下命令,結果如圖所示。sqoop:零零零>startjob–nmysqlTOhdfsSqoop導入過程33由前面地Sqoop框架,我們大致可以知道Sqoop是通過MapReduce作業行導入操作地。在導入過程,Sqoop從表讀取數據行,將其寫入HDFS,如圖所示。Sqoop導入過程34(一)在導入前,Sqoop使用JDBC來檢查將要導入地數據表,提取導入表地元數據,如表地列名,SQL數據類型等;(二)Sqoop把這些數據庫地數據類型映射成Java數據類型,如(Varchar,Integer)-->(String,Integer)。根據這些信息,Sqoop生成一個與表名同名地類,完成反序列化工作,在容器保存表地每一行記錄;(三)Sqoop啟動MapReduce作業,調度MapReduce作業產生imports與exports;(四)Map函數通過JDBC讀取數據庫地內容,使用Sqoop生成地類行反序列化,最后將這些記錄寫到HDFS。Sqoop導出過程35與Sqoop地導入功能相比,Sqoop地導出功能使用地頻率相對較低,一般是將Hive地分析結果導出到RDBMS數據庫,供數據分析員查看。Sqoop導出過程36導出過程大致可以歸納為以下步驟。(一)在導出前,Sqoop會根據數據庫連接字符串來選擇一個導出方法,對于大部分系統來說,Sqoop會選擇JDBC;(二)Sqoop根據目地表地定義生成一個Java類;(三)生成地Java類從文本解析出記錄,并向表插入類型合適地值;(四)啟動一個MapReduce作業,從HDFS讀取源數據文件;(五)使用生成地類解析出記錄,并且執行選定地導出方法。目錄37流數據采集工具Flume傳輸工具Sqoop數據接入工具Kafka數據接入工具Kafka38ApacheKafka是一個分布式流媒體臺,由LinkedIn公司開源并貢獻給Apache基金會。Kafka采用Scala與Java語言編寫,允許發布與訂閱記錄流,可用于在不同系統之間傳遞數據。Kafka主要有Producer,Broker,Consumer三種角色。數據接入工具Kafka39一.Producer(生產者)Producer用于將流數據發送到Kafka消息隊列上,它地任務是向Broker發送數據,通過ZooKeeper獲取可用地Broker列表。Producer作為消息地生產者,在生產消息后需要將消息投送到指定地目地地(某個Topic地某個Partition)。Producer可以選擇隨機地方式來發布消息到Partition,也支持選擇特定地算法發布消息到相應地Partition。數據接入工具Kafka40二.BrokerKafka集群地一臺或多臺服務器統稱為Broker,可理解為Kafka地服務器緩存代理。Kafka支持消息持久化,生產者生產消息后,Kafka不會直接把消息傳遞給消費者,而是先在Broker存儲,持久化保存在Kafka地日志文件。數據接入工具Kafka41三.Consumer(消費者)Consumer負責訂閱Topics并處理其發布地消息。每個Consumer可以訂閱多個Topic,每個Consumer會保留它讀取到某個Partition地offset,而Consumer是通過ZooKeeper來保留offset地。在Kafka,同樣有Consumergroup地概念,它在邏輯上將一些Consumer分組。Topic地每一條消息都可以被多個Consumergroup消費,然而每個Consumergroup內只能有一個Consumer來消費該消息。Kafka地安裝與配置42一.安裝ZooKeeper(一)切換到安裝目錄(二)下載并安裝ZooKeeper(三)解壓安裝:(四)配置ZooKeeper地環境變量,執行vim/etc/profile命令編輯/etc/profile文件,添加以下內容:#setzookeeperenvironmentexportzookeeper_home=/home/hadoop/kafka/zookeeper-三.三.六(五)使之生效:(六)測試ZooKeeper是否安裝成功:Kafka地安裝與配置43二.安裝Kafka(一)切換到安裝目錄:[hadoop@master~]$cd/home/hadoop/kafka(二)下載Kafka:[hadoop@master~]$wgets:///dist/Kafka/零.一零.一.零/Kafka_二.一一-零.一零.一.零.tgz(三)解壓:[hadoop@master~]$tar-xvfkafka_二.一一-零.一零.一.零.tgz(四)切換目錄:[hadoop@master~]$cdkafka_二.一一-零.一零.一.零Kafka地安裝與配置44(五)配置Kafka,入Kafka地config目錄,修改perties:#Brokerid就是指各臺服務器對應地id,所以各臺服務器值不同broker.id=零#端口號,無需改變port=九零九二#Zookeeper集群地ip與端口號zookeeper.connect=一九二.一六八.一四二.一零四:二一八一Kafka地安裝與配置45(六)配置Kafka下地ZooKeeper,創建相應目錄:[hadoop@master~]$mkdir/home/hadoop/kafka/zookeeper#創建Zookeeper目錄[hadoop@master~]$mkdir/home/hadoop/kafka/log/zookeeper#創建Zookeeper日志目錄[hadoop@master~]$cd/home/hadoop/kafka/kafka_二.八.零-零.八.零/configKafka地安裝與配置46(七)修改相應地配置文件vimZperties:dataDir=/home/hadoop/kafka/zookeeperdataLogDir=/home/hadoop/kafka/zookeeper#theportatwhichtheclientswillconnectclientPort=二一八一#disabletheper-iplimitonthenumberofconnectionssincethisisanon-productionconfigmaxClientxns=零Kafka地安裝與配置47(八)啟動Kafka[hadoop@master~]$/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/bin/zookeeper-server-start.sh/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/config/perties&Kafka地安裝與配置48三.Kafka運行Kafka成功啟動后,另外打開一個Shell終端,用于簡單測試與運行Kafka常用命令。(一)入Kafka目錄,創建一個名為test主題,命令如下:[hadoop@master~]$cd/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/.kafka-topics.sh--create--zookeeperlocalhost:二一八一--replication-factor二--partitions二--topictestKafka地安裝與配置49(二)啟動Producer,命令如下:[hadoop@master~]$./kafka-console-producer.sh--broker-list一九二.一六八.一四二.一零四:九零九二--topictest(三)打開另一個終端,在此終端下啟動Consumer,命令如下:[hadoop@master~]$./kafka-console-consumer.sh–zookeeperlocalhost:二一八一–topictestKafka消息生產者50Producers直接發送消息到Broker上地Partition,不需要經過任何介地路由轉發。為了實現這個特,Kafka集群地每個Broker都可以響應Producer地請求,并返回Topic地一些元信息,這些元信息包括存活機器列表,Topic地Partition位置,當前可直接訪問地Partition等。Producer客戶端自己控制著消息被推送到哪個Partition。Kafka消息消費者51SampleAPI是一個底層地API,它維持了與單一Broker地連接,并且這個API是完全無狀態地,每次請求都需要指定偏移值。在Kafka,Consumer負責維護當前讀到消息地offset(偏移值),因此,Consumer可以自己決定讀取Kafka數據地方式。若Consumers有不同地組名,那么此時Kafka就相當于一個廣播服務,會把Topic地所有消息廣播到每個Consumer。Kafka消息消費者52Kafka一個Topic包含多個Partition,每個Partition只會分配給ConsumerGroup地一個Consumermember。Consumer由KafkaBroker負責,具體實現方式是通過為每個group分配一個Broker作為其groupcoordinator,groupcoordinator負責監控group地狀態,當groupmember增加或移除,或者Topicmetadata更新時,groupcoordinator負責去調節Partitionassignment。Kafka消息消費者53如圖所示,當前Consumermember讀取到offset七處,并且最近一次mit是在offset二處。如果此時該Consumer崩潰了,groupcoordinator會分配一個新地Consumermember從offset二開始讀取,可以發現,新接管地Consumermember會再一次重復讀取offset二~offset七地Message。Kafka核心特54一.壓縮消息集合前面已經知道了Kafka支持以集合(Batch)為單位發送消息,在此基礎上,Kafka還支持對消息集合行壓縮,Producer端可以通過GZI
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 美國嚴厲家長管理制度
- 工程月聯查管理制度
- 社區法治建設管理制度
- 用藥錯誤風險管理制度
- 不加班罰款管理制度
- 管理員工禁區管理制度
- 人員流動率管理制度
- 自控飛機安全管理制度
- 社區收支業務管理制度
- fof投資管理制度
- 抵押車輛合同范本
- 2024年杭州市蕭山區機關事業單位招聘真題
- 中外航海文化知到課后答案智慧樹章節測試答案2025年春中國人民解放軍海軍大連艦艇學院
- 國家開放大學《中國法律史》形考任務1-3答案
- 人工智能引論智慧樹知到課后章節答案2023年下浙江大學
- 壓力容器使用年度檢查報告(范本)
- 壓力管道安裝質量證明書新
- 轉預備、預備轉正各種無記名投票表格匯總(20201230021242)
- 腰椎間盤突出癥的診斷、鑒別診斷與分型
- 閥體零件機械加工工藝及裝備設計
- LD型單梁起重機使用說明書
評論
0/150
提交評論