




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
1、大講臺分享:五種基于 MapReduce 的并行計算框架介紹及性能測試當使用 Hadoop 技術架構集群,集群內新增、刪除節點,或者某個節點機器內硬盤存儲達到飽和值時,都會造成集群內數據分布不均勻、數據丟失風險增加等問題出現。本文對 HDFS 內部的數據平衡方式做了介紹,通過實驗案例的方式向讀者解釋內部數據平衡的解決辦法。并行計算模型和框架目前開源社區有許多并行計算模型和框架可供選擇,按照實現方式、運行機制、依附的產品生態圈等可以被劃分為幾個類型,每個類型各有優缺點,如果能夠對各類型的并行計算框架都進行深入研究及適當的缺點修復,就可以為不同硬件環境下的海量數據分析需求提供不同的軟件層面的解決方
2、案。· 并行計算框架并行計算或稱平行計算是相對于串行計算來說的。它是一種一次可執行多個指令的算法,目的是提高計算速度,以及通過擴大問題求解規模,解決大型而復雜的計算問題。所謂并行計算可分為時間上的并行和空間上的并行。時間上的并行就是指流水線技術,而空間上的并行則是指用多個處理器并發的執行計算。并行計算(Parallel Computing)是指同時使用多種計算資源解決計算問題的過程,是提高計算機系統計算速度和處理能力的一種有效手段。它的基本思想是用多個處理器來協同求解同一問題,即將被求解的問題分解成若干個部分,各部分均由一個獨立的處理機來并行計算。并行計算系統既可以是專門設計的、含有
3、多個處理器的超級計算機,也可以是以某種方式互連的若干臺的獨立計算機構成的集群。通過并行計算集群完成數據的處理,再將處理的結果返回給用戶。· 國內外研究歐美發達國家對于并行計算技術的研究要遠遠早于我國,從最初的并行計算逐漸過渡到網格計算,隨著 Internet 網絡資源的迅速膨脹,因特網容納了海量的各種類型的數據和信息。海量數據的處理對服務器 CPU、IO 的吞吐都是嚴峻的考驗,不論是處理速度、存儲空間、容錯性,還是在訪問速度等方面,傳統的技術架構和僅靠單臺計算機基于串行的方式越來越不適應當前海量數據處理的要求。國內外學者提出很多海量數據處理方法,以改善海量數據處理存在的諸多問題。目前
4、已有的海量數據處理方法在概念上較容易理解,然而由于數據量巨大,要在可接受的時間內完成相應的處理,只有將這些計算進行并行化處理,通過提取出處理過程中存在的可并行工作的分量,用分布式模型來實現這些并行分量的并行執行過程。隨著技術的發展,單機的性能有了突飛猛進的發展變化,尤其是內存和處理器等硬件技術,但是硬件技術的發展在理論上總是有限度的,如果說硬件的發展在縱向上提高了系統的性能,那么并行技術的發展就是從橫向上拓展了處理的方式。2003 年美國 Google 公司對外發布了 MapReduce、GFS、BigData 三篇論文,至此正式將并行計算框架落地為 MapReduce 框架。我國的并行和分布
5、式計算技術研究起源于 60 年代末,按照國防科技大學周興銘院士提出的觀點,到目前為止已經三個階段了。第一階段,自 60 年代末至 70 年代末,主要從事大型機內的并行處理技術研究;第二階段,自 70 年代末至 90 年代初,主要從事向量機和并行多處理器系統研究;第三階段,自 80 年代末至今,主要從事 MPP(Massively Parallel Processor) 系統研究。盡管我國在并行計算方面開展的研究和應用較早,目前也擁有很多的并行計算資源,但研究和應用的成效相對美國還存在較大的差距,有待進一步的提高和發展。MapReduceMapReduce 是由谷歌推出的一個編程模型,是一個能處
6、理和生成超大數據集的算法模型,該架構能夠在大量普通配置的計算機上實現并行化處理。MapReduce 編程模型結合用戶實現的 Map 和 Reduce 函數。用戶自定義的 Map 函數處理一個輸入的基于 key/value pair 的集合,輸出中間基于 key/value pair 的集合,MapReduce 庫把中間所有具有相同 key 值的 value 值集合在一起后傳遞給 Reduce 函數,用戶自定義的 Reduce 函數合并所有具有相同 key 值的 value 值,形成一個較小 value 值的集合。一般地,一個典型的 MapReduce 程序的執行流程如圖 1 所示。圖 1 .M
7、apReduce 程序執行流程圖MapReduce 執行過程主要包括:1. 將輸入的海量數據切片分給不同的機器處理;2. 執行 Map 任務的 Worker 將輸入數據解析成 key/value pair,用戶定義的 Map 函數把輸入的 key/value pair 轉成中間形式的 key/value pair;3. 按照 key 值對中間形式的 key/value 進行排序、聚合;4. 把不同的 key 值和相應的 value 集分配給不同的機器,完成 Reduce 運算;5. 輸出 Reduce 結果。任務成功完成后,MapReduce 的輸出存放在 R 個輸出文件中,一般情況下,這 R
8、 個輸出文件不需要合并成一個文件,而是作為另外一個 MapReduce 的輸入,或者在另一個可處理多個分割文件的分布式應用中使用。受 Google MapReduce 啟發,許多研究者在不同的實驗平臺上實現了 MapReduce 框架,本文將對 Apache Hadoop MapReduce、Apache、Spark、斯坦福大學的 Phoenix,Nokia 研發的 Disco,以及香港科技大學的 Mars 等 5 個 MapReduce 實現框架進行逐一介紹和各方面對比。· Hadoop MapReduceHadoop 的設計思路來源于 Google 的 GFS 和 MapRedu
9、ce。它是一個開源軟件框架,通過在集群計算機中使用簡單的編程模型,可編寫和運行分布式應用程序處理大規模數據。Hadoop 包含三個子項目:Hadoop Common、Hadoop Distributed File System(HDFS) 和 Hadoop MapReduce。第一代 Hadoop MapReduce 是一個在計算機集群上分布式處理海量數據集的軟件框架,包括一個 JobTracker 和一定數量的 TaskTracker。運行流程圖如圖 2 所示。圖 2 .Hadoop MapReduce 系統架構圖在最上層有 4 個獨立的實體,即客戶端、JobTracker、TaskTrac
10、ker 和分布式文件系統。客戶端提交 MapReduce 作業;JobTracker 協調作業的運行;JobTracker 是一個 Java 應用程序,它的主類是 JobTracker;TaskTracker 運行作業劃分后的任務,TaskTracker 也是一個 Java 應用程序,它的主類是 TaskTracker。Hadoop 運行 MapReduce 作業的步驟主要包括提交作業、初始化作業、分配任務、執行任務、更新進度和狀態、完成作業等 6 個步驟。· Spark MapReduceSpark 是一個基于內存計算的開源的集群計算系統,目的是讓數據分析更加快速。Spark 非常
11、小巧玲瓏,由加州伯克利大學 AMP 實驗室的 Matei 為主的小團隊所開發。使用的語言是 Scala,項目的核心部分的代碼只有 63 個 Scala 文件,非常短小精悍。Spark 啟用了內存分布數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。Spark 提供了基于內存的計算集群,在分析數據時將數據導入內存以實現快速查詢,“速度比”基于磁盤的系統,如比 Hadoop 快很多。Spark 最初是為了處理迭代算法,如機器學習、圖挖掘算法等,以及交互式數據挖掘算法而開發的。在這兩種場景下,Spark 的運行速度可以達到 Hadoop 的幾百倍。· DiscoDisco 是由
12、Nokia 研究中心開發的,基于 MapReduce 的分布式數據處理框架,核心部分由 Erlang 語言開發,外部編程接口為 Python 語言。Disco 是一個開放源代碼的大規模數據分析平臺,支持大數據集的并行計算,能運行在不可靠的集群計算機上。Disco 可部署在集群和多核計算機上,還可部署在 Amazon EC2 上。Disco 基于主/從架構 (Master/Slave),圖 3 總體設計架構圖展示了通過一臺主節點 (Master) 服務器控制多臺從節點 (Slave) 服務器的總體設計架構。圖 3 .Disco 總體架構圖Disco 運行 MapReduce 步驟如下:1. Di
13、sco 用戶使用 Python 腳本開始 Disco 作業;2. 作業請求通過 HTTP 發送到主機;3. 主機是一個 Erlang 進程,通過 HTTP 接收作業請求;4. 主機通過 SSH 啟動每個節點處的從機;5. 從機在 Worker 進程中運行 Disco 任務。· PhoenixPhoenix 作為斯坦福大學 EE382a 課程的一類項目,由斯坦福大學計算機系統實驗室開發。Phoenix 對 MapReduce 的實現原則和最初由 Google 實現的 MapReduce 基本相同。不同的是,它在集群中以實現共享內存系統為目的,共享內存能最小化由任務派生和數據間的通信所造
14、成的間接成本。Phoenix 可編程多核芯片或共享內存多核處理器 (SMPs 和 ccNUMAs),用于數據密集型任務處理。· MarsMars 是香港科技大學與微軟、新浪合作開發的基于 GPU 的 MapReduce 框架。目前已經包含字符串匹配、矩陣乘法、倒排索引、字詞統計、網頁訪問排名、網頁訪問計數、相似性評估和 K 均值等 8 項應用,能夠在 32 位與 64 位的 Linux 平臺上運行。Mars 框架實現方式和基于 CPU 的 MapReduce 框架非常類似,也由 Map 和 Reduce 兩個階段組成,它的基本工作流程圖如圖 4 所示。圖 4 .Mars 基本工作流程
15、圖在開始每個階段之前,Mars 初始化線程配置,包括 GPU 上線程組的數量和每個線程組中線程的數量。Mars 在 GPU 內使用大量的線程,在運行時階段會均勻分配任務給線程,每個線程負責一個 Map 或 Reduce 任務,以小數量的 key/value 對作為輸入,并通過一種無鎖的方案來管理 MapReduce 框架中的并發寫入。Mars 的工作流程主要有 7 個操作步驟:1. 在主存儲器中輸入 key/value 對,并將它們存儲到數組;2. 初始化運行時的配置參數;3. 復制主存儲器中的輸入數組到 GPU 設備內存;4. 啟動 GPU 上的 Map 階段,并將中間的 key/value
16、 對存儲到數組;5. 如果 mSort 選擇 F,即需要排序階段,則對中間結果進行排序;6. 如果 noReduce 是 F,即需要 Reduce 階段,則啟動 GPU 上的 Reduce 階段,并輸出最終結果,否則中間結果就是最終結果;7. 復制 GPU 設備存儲器中的結果到主存儲器。上述步驟的 1,2,3,7 這四個步驟的操作由調度器來完成,調度器負責準備數據輸入,在 GPU 上調用 Map 和 Reduce 階段,并將結果返回給用戶。五種框架的優缺點比較表 1. 五種框架優缺點比較Hadoop MapReduceSparkPhoenixDiscoMars編程語言Java 為主ScalaC
17、ErlangC+構建平臺需要首先架構基于 Hadoop 的集群系統,通過 HDFS 完成運算的數據存儲工作可以的單獨運行,也可以與 Hadoop 框架完整結合獨立運行,不需要提前部署集群,運行時系統的實現是建立在 PThread 之上的,也可方便地移植到其他共享內存線程庫上整個 Disco 平臺由分布式存儲系統 DDFS 和 MapReduce 框架組成,DDFS 與計算框架高度耦合,通過監控各個節點上的磁盤使用情況進行負載均衡運行時為 Map 或 Reduce 任務初始化大量的 GPU 線程,并為每個線程自動分配少量的 key/value 對來運行任務功能特點計算能力非常強,適合超大數據集的
18、應用程序,但是由于系統開銷等原因,處理小規模數據的速度不一定比串行程序快,并且本身集群的穩定性不高在保證容錯的前提下,用內存來承載工作集,內存的存取速度快于磁盤多個數量級,從而可以極大提升性能利用共享內存緩沖區實現通信,從而避免了因數據復制產生的開銷,但 Phoenix 也存在不能自動執行迭代計算、沒有高效的錯誤發現機制等不足由一個 Master 服務器和一系列 Worker 節點組成,Master 和 Worker 之間采用基于輪詢的通信機制,通過 HTTP 的方式傳輸數據。輪詢的時間間隔不好確定,若時間間隔設置不當,會顯著降低程序的執行性能由于 GPU 線程不支持運行時動態調度,所以給每個
19、 GPU 線程分配的任務是固定的,若輸入數據劃分布均勻,將導致 Map 或 Reduce 階段的負載不均衡,使得整個系統性能急劇降低。同時由于 GPU 不支持運行時在設備內存中分配空間,需要預先在設備內存中分配好輸入數據和輸出數據的存放空間,但是 Map 和 Reduce 階段輸出數據大小是未知的,并且當多個 GPU 線程同時向共享輸出區域中寫數據時,易造成寫沖突WordCount 實驗· 基本原理單詞計數 (WordCount) 是最簡單也是最能體現 MapReduce 思想的程序之一,可以稱為 MapReduce 版"Hello World"。單詞計數主要完成
20、功能是:統計一系列文本文件中每個單詞出現的次數。· 本次實驗步驟本次實驗的硬件資源基于 x86 服務器 1 臺,配置為內存 32GB DDR3、E5 CPU/12 核、GPU,實驗數據樣本為 10M/50M/100M/500M/1000M 的文本文件五個,我們使用 Hadoop MapReduce、Spark、Phoenix、Disco、Mars 等 MapReduce 框架分別運行文本分析程序,基于結果一致的前提下統計出運行時間、運行時 CPU 占有率、運行時內存占有率等數據,并采用這些數據繪制成柱狀圖。Hadoop MapReduce首先需要將文件拆分成 splits,由于測試用
21、的文件較小,所以每個文件為一個 split,并將文件按行分割形成<key,value>對,圖 12 分割過程圖所示。這一步由 MapReduce 框架自動完成,其中偏移量(即 key 值)包括了回車所占的字符數(Windows 和 Linux 環境會不同)。圖 5 . 分割過程圖將分割好的<key,value>對交給用戶定義的 map 方法進行處理,生成新的<key,value>對,圖 6 執行 map 方法所示。圖 6 . 執行 Map 方法過程圖得到 map 方法輸出的<key,value>對后,Mapper 會將它們按照 key 值進行排序
22、,并執行 Combine 過程,將 key 相同的 value 值累加,得到 Mapper 的最終輸出結果。圖 7Map 端排序及 Combine 過程所示。圖 7 . Map 端排序及 Combine 過程Reducer 先對從 Mapper 接收的數據進行排序,再交由用戶自定義的 reduce 方法進行處理,得到新的<key,value>對,并作為 WordCount 的輸出結果,圖 15Reduce 端排序及輸出結果所示。圖 8 . Reduce 端排序及輸出結果流程圖清單 1 . 第一代 Hadoop MapReduce WordCount 示例代碼import java.
23、io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.had
24、oop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWrit
25、able>private final static IntWritable one = new IntWritable(1);private Text word = new Text();/ 開始 Map 過程public void map(Object key, Text value, Context context)throws IOException, InterruptedException StringTokenizer itr = new StringTokenizer(value.toString();/遍歷 Map 里面的字符串while (itr.hasMoreToke
26、ns() word.set(itr.nextToken();context.write(word, one);public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> private IntWritable result = new IntWritable();/開始 Reduce 過程public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOExcep
27、tion, InterruptedException int sum = 0;for (IntWritable val : values) sum += val.get();result.set(sum);context.write(key, result); public static void main(String args) throws Exception Configuration conf = new Configuration();String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()
28、;if (otherArgs.length != 2) System.err.println("Usage: wordcount <in> <out>");System.exit(2);Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClas
29、s(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs0);FileOutputFormat.setOutputPath(job, new Path(otherArgs1);System.exit(job.waitForCompletion(true) ? 0 : 1);Spark WordCount 實驗Spark 與 Hadoop MapRed
30、uce 的最大區別是它把所有數據保存在內存中,Hadoop MapReduce 需要從外部存儲介質里把數據讀入到內存,Spark 不需要這一步驟。它的實現原理與 Hadoop MapReduce 沒有太大區別,這里不再重復原理,完整的運行代碼如清單 2 所示。清單 2 . Spark WordCount 示例代碼SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<
31、;String> lines = ctx.textFile(args0, Integer.parseInt(args1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() Override public Iterable<String> call(String s) return Arrays.asList(SPACE.split(s); ); /定義 RDD ones JavaPairRDD<String, Integer> ones
32、= words.mapToPair(new PairFunction<String, String, Integer>() Override public Tuple2<String, Integer> call(String s) return new Tuple2<String, Integer>(s, 1); ); /ones.reduceByKey(func, numPartitions) JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Int
33、eger, Integer, Integer>() Override public Integer call(Integer i1, Integer i2) return i1 + i2; ,10);/輸出 ListList<Tuple2<String, Integer>> output = counts.collect(); Collections.sort(output, new Comparator<Tuple2<String, Integer>>() Override public int compare(Tuple2<Str
34、ing, Integer> t1,Tuple2<String, Integer> t2) if(t1._2 > t2._2) return -1; else if(t1._2 < t2._2) return 1; return 0; );Disco WordCount 實驗MapReduce 框架由于 Disco 有分布式文件系統存在,所以一般情況下都不會單獨使用,都是從分布式文件系統內取數據后讀入內存,然后再切分數據、進入 MapReduce 階段。首先需要調用 ddfs 的 chunk 命令把文件上傳到 DDFS,然后開始編寫 MapReduce 程序,Dis
35、co 外層應用程序采用 Python 編寫。Map 程序實例如清單 3 所示,Reduce 程序示例如清單 4 所示。清單 3 . Map 程序段def fun_map(line, params): for word in line.split(): yield word, 1清單 4 . Reduce 程序段def fun_reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter): yield word, sum(counts)清單 5 . Map/Reduce
36、任務from disco.core import Job, result_iteratordef map(line, params): for word in line.split(): yield word, 1def reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter): yield word, sum(counts)if _name_ = '_main_': job = Job().run(input="http:/discopr
37、/media/text/chekhov.txt", map=map, reduce=reduce) for word, count in result_iterator(job.wait(show=True): print(word, count)NotePhoenix WordCount 實驗Phoenix 是基于 CPU 的 MapReduce 框架,所以它也是采用將數據分割后讀入內存,然后開始 MapReduce 處理階段這樣的傳統方式。Phoenix 并不由用戶決定切分每個 Map 分配到的數據塊的大小,它是根據集群系統的實際 Cache 大小來切分的,這樣
38、可以避免出現分配到 Map 的數據塊過大或者過小的情況出現。過大的數據快會導致 Map 執行較慢,過小的數據快會導致 Map 資源浪費,因為每次啟動 Map 線程都需要消耗一定的系統資源。Map 階段切分好的文本被多個 Map 并行執行,Phoenix 支持 100 個左右的 Map 并行執行,一個工作節點下可以有若干個 Map 并行執行。只有當一個工作節點上所有的 Map 任務都結束后才開始 Reduce 階段。Reduce 階段繼續沿用了動態任務調度機制,同時允許用戶自定義數據分區規則。清單 6 . Phoenix 的 wordCount 程序段#include <stdio.h&g
39、t;#include <strings.h>#include <string.h>#include <stddef.h>#include <stdlib.h>#include <unistd.h>#include <assert.h>#include <sys/mman.h>#include <sys/stat.h>#include <sys/time.h>#include <fcntl.h>#include <ctype.h>#include <intty
40、pes.h>#include "map_reduce.h"#include "stddefines.h"#include "sort.h"#define DEFAULT_DISP_NUM 10typedef struct int fpos; off_t flen; char *fdata; int unit_size; wc_data_t;enum IN_WORD, NOT_IN_WORD; struct timeval begin, end;#ifdef TIMING unsigned int library_time = 0
41、;#endif/* mystrcmp() * Comparison function to compare 2 words */int mystrcmp(const void *s1, const void *s2) return strcmp(const char *)s1, (const char *) s2);/* mykeyvalcmp() * Comparison function to compare 2 ints */int mykeyvalcmp(const void *v1, const void *v2) keyval_t* kv1 = (keyval_t*)v1; key
42、val_t* kv2 = (keyval_t*)v2; intptr_t *i1 = kv1->val; intptr_t *i2 = kv2->val; if (i1 < i2) return 1; else if (i1 > i2) return -1; else return strcmp(char *)kv1->key, (char *)kv2->key); /return 0; /* wordcount_分割器 () * 內存里面進行 Map 計算 */int wordcount_splitter(void *data_in, int req_un
43、its, map_args_t *out) wc_data_t * data = (wc_data_t *)data_in; assert(data_in); assert(out); assert(data->flen >= 0); assert(data->fdata); assert(req_units); assert(data->fpos >= 0); / End of file reached, return FALSE for no more data if (data->fpos >= data->flen) return 0;
44、/ Set the start of the next data out->data = (void *)&data->fdatadata->fpos; / Determine the nominal length out->length = req_units * data->unit_size; if (data->fpos + out->length > data->flen) out->length = data->flen - data->fpos; / Set the length to end at
45、a space for (data->fpos += (long)out->length; data->fpos < data->flen && data->fdatadata->fpos != ' ' && data->fdatadata->fpos != 't' && data->fdatadata->fpos != 'r' && data->fdatadata->fpos != 'n'
46、 data->fpos+, out->length+); return 1;/* wordcount_locator() * Return the memory address where this map task would heavily access. */void *wordcount_locator (map_args_t *task) assert (task); return task->data;/* wordcount_map() * 對文本進行計數 */void wordcount_map(map_args_t *args) char *curr_sta
47、rt, curr_ltr; int state = NOT_IN_WORD; int i; assert(args); char *data = (char *)args->data; assert(data); curr_start = data; for (i = 0; i < args->length; i+) curr_ltr = toupper(datai); switch (state) case IN_WORD: datai = curr_ltr; if (curr_ltr < 'A' | curr_ltr > 'Z'
48、) && curr_ltr != ''') datai = 0; emit_intermediate(curr_start, (void *)1, &datai - curr_start + 1); state = NOT_IN_WORD; break; default: case NOT_IN_WORD: if (curr_ltr >= 'A' && curr_ltr <= 'Z') curr_start = &datai; datai = curr_ltr; state =
49、IN_WORD; break; / Add the last word if (state = IN_WORD) dataargs->length = 0; emit_intermediate(curr_start, (void *)1, &datai - curr_start + 1); /* wordcount_reduce() * 計算字符 */void wordcount_reduce(void *key_in, iterator_t *itr) char *key = (char *)key_in; void *val; intptr_t sum = 0; assert
50、(key); assert(itr); while (iter_next (itr, &val) sum += (intptr_t)val; emit(key, (void *)sum);void *wordcount_combiner (iterator_t *itr) void *val; intptr_t sum = 0; assert(itr); while (iter_next (itr, &val) sum += (intptr_t)val; return (void *)sum;int main(int argc, char *argv) final_data_t
51、 wc_vals; int i; int fd; char * fdata; int disp_num; struct stat finfo; char * fname, * disp_num_str; struct timeval starttime,endtime; get_time (&begin); / 確保文件名 if (argv1 = NULL) printf("USAGE: %s <filename> Top # of results to displayn", argv0); exit(1); fname = argv1; disp_nu
52、m_str = argv2; printf("Wordcount: Running.n"); / 讀取文件 CHECK_ERROR(fd = open(fname, O_RDONLY) < 0); / Get the file info (for file length) CHECK_ERROR(fstat(fd, &finfo) < 0);#ifndef NO_MMAP / 內存里面開始調用 map CHECK_ERROR(fdata = mmap(0, finfo.st_size + 1, PROT_READ | PROT_WRITE, MAP_PR
53、IVATE, fd, 0) = NULL);#else int ret; fdata = (char *)malloc (finfo.st_size); CHECK_ERROR (fdata = NULL); ret = read (fd, fdata, finfo.st_size); CHECK_ERROR (ret != finfo.st_size);#endif CHECK_ERROR(disp_num = (disp_num_str = NULL) ? DEFAULT_DISP_NUM : atoi(disp_num_str) <= 0); wc_data_t wc_data;
54、wc_data.unit_size = 5; / approx 5 bytes per word wc_data.fpos = 0; wc_data.flen = finfo.st_size; wc_data.fdata = fdata; CHECK_ERROR (map_reduce_init (); map_reduce_args_t map_reduce_args; memset(&map_reduce_args, 0, sizeof(map_reduce_args_t); map_reduce_args.task_data = &wc_data; map_reduce_
55、args.map = wordcount_map; map_reduce_args.reduce = wordcount_reduce; map_reduce_biner = wordcount_combiner; map_reduce_args.splitter = wordcount_splitter; map_reduce_args.locator = wordcount_locator; map_reduce_args.key_cmp = mystrcmp; map_reduce_args.unit_size = wc_data.unit_size; map_reduce_args.p
56、artition = NULL; / use default map_reduce_args.result = &wc_vals; map_reduce_args.data_size = finfo.st_size; map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE");/1024 * 1024 * 2; map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS");/8; map_reduce_args.num_
57、reduce_threads = atoi(GETENV("MR_NUMTHREADS");/16; map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS");/8; map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS");/16; map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR");/2; p
58、rintf("Wordcount: Calling MapReduce Scheduler Wordcountn"); gettimeofday(&starttime,0); get_time (&end);#ifdef TIMING fprintf (stderr, "initialize: %un", time_diff (&end, &begin);#endif get_time (&begin); CHECK_ERROR(map_reduce (&map_reduce_args) < 0); get_time (&end);#ifdef TIMING library_time += time_diff (&end, &begin);#endif get_time (&begin); gettimeofday(&endtime,0); printf("Wordcount: Completed %ldn",(endtime.tv_sec - starttime.tv_sec); printf("Wordcount: MapReduce Completedn"); printf("Wordco
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 重大自然災害中檔案應急管理機制研究
- 主動脈夾層診斷與護理
- 零售行業代收貨款服務條款協議
- 文化創意產業財產抵押貸款協議
- 菜園種植與城市垃圾分類回收合同
- 茶樓茶藝與茶文化主題酒店合作合同范本
- 車庫租賃與停車場綜合管理合同
- 拆遷安置補償居間服務協議書
- 電視劇拍攝現場制片助理勞務合作協議
- 彩鋼房倉儲物流合作項目承包協議
- 2024年石家莊市市屬國有企業招聘筆試真題
- 2024年廣東“三支一扶”計劃招募筆試真題
- 設備租賃方案(3篇)
- 公關費用標準管理制度
- 2025-2030年中國潔凈室風扇過濾單元行業市場現狀供需分析及投資評估規劃分析研究報告
- 2025至2030中國汽車租賃行業發展分析及發展戰略與市場策略報告
- 2025年煙臺市中考地理試卷真題
- 安徽省合肥市名校2025屆八年級英語第二學期期末統考試題含答案
- 2024年廣東省廣州市初中生物會考真題(含答案)
- 2025年河北省中考麒麟卷生物(一)
- 2025初升高數學銜接教材
評論
0/150
提交評論