Hadoop源碼以及流程解析_第1頁
Hadoop源碼以及流程解析_第2頁
Hadoop源碼以及流程解析_第3頁
Hadoop源碼以及流程解析_第4頁
Hadoop源碼以及流程解析_第5頁
已閱讀5頁,還剩9頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

1、Hadoop源碼以及流程解析 整體結(jié)構(gòu) DN: Configuration,JobClient,JobConf Master:JobTracker ,JoblnProgress,T asklnProgress WN:T askTracker ,Task(MapT ask、ReduceT ask),JvmManager ,Child Jobld Add new job Joblracker I JobInProgress TasklnPrTaskInPro TasklnPro ogressl gress2gress2 fTasklnPro gress2 Ta*lnpo Task 圖1整體結(jié)構(gòu) C

2、lient Configuration 從 Configuration 類的源代碼可以看到,定義了如下私有成員變量: private boolean quietmode = true;/ 第一個是 boolean 型變量 quietmode ,用于設(shè)置加載配置 的模式。通過閱讀源代碼就可以清楚,這個 quietmode 如果為 true ,實(shí)際上默認(rèn)就為 true, 加載配置的模式為快速模式, 其實(shí)也就是在解析配置文件的過程中, 不輸出日志信息, 就這 么簡單。 private ArrayList defaultResources = new ArrayList();/ 它是一個列表,該列表中

3、存放的是配置 文件的名稱 private ArrayList resources = new ArrayList();/全部資源的配置包括 URL 、 String 、 Path、InputStream private Set finalParameters = new HashSet();/ 程序性的 private boolean loadDefaults = true;/ 是否載入默認(rèn)資源 private static final WeakHashMap REGISTRY = new WeakHashMap();/ private Properties properties;/ 個人程序

4、所需要的所有配置會以 Properties 的形式存儲 private Properties overlay;/ 它也是一個 Properties 變量。它對應(yīng)于 finalResources 列表,也就 是解析 finalResources 列表中設(shè)置的配置文件,配置項(xiàng)設(shè)置到 overlay 中。這里, overlay 比 較關(guān)鍵的一點(diǎn)就是,如果 overlay 不為空屬性配置,在創(chuàng)建一個 Configuration 實(shí)例的時候會 檢查 overlay ,不空就將其中的配置項(xiàng)加入到properties 中 private ClassLoader classLoader;/ 類加載器 在這里所

5、有客戶端程序中配置的類的信息和其他運(yùn)行信息,都會保存在這個類里。 JobClient JobClient.runJob(job) 靜 態(tài) 方法 會實(shí)例 化 一 個 JobClient 實(shí) 例 ,然 后用此 實(shí) 例 的 submitJob(job) 方法向 master 提交作業(yè)。 此方法會返回一個 RunningJob 對象, 它用來跟蹤作 業(yè)的狀態(tài)。作業(yè)提交完畢后, JobClient 會根據(jù)此對象開始輪詢作業(yè)的進(jìn)度,直到作業(yè)完成。 submitJob(job) 內(nèi) 部 是 通 過 submitJobInternal(job) 方 法 完 成 實(shí) 質(zhì) 性 的 作 業(yè) 提 交 。 submit

6、JobInternal(job)方法首先會向hadoop分布系統(tǒng)文件系統(tǒng) hdfs依次上傳三個文件:job.jar, job.split 和 job.xml 。 job.xml: 作業(yè)配置,例如 Mapper, Combiner, Reducer 的類型,輸入輸出格式的類型等。 job.jar: jar 包,里面包含了執(zhí)行此任務(wù)需要的各種類,比如Mapper,Reducer 等實(shí)現(xiàn)。 job.split:文件分塊的相關(guān)信息,比如有數(shù)據(jù)分多少個塊,塊的大小(默認(rèn)64m)等。 這三個文件在 hdfs 上的路徑由 hadoop-default.xml 文件中的 mapreduce 系統(tǒng)路徑 mapr

7、ed.system.dir 屬性 + jobId 決定。 mapred.system.dir 屬性默認(rèn)是 /tmp/hadoop-user_name/ mapred/system。寫完這三個文件之后 ,此方法會通過 RPC調(diào)用 master節(jié)點(diǎn)上的 JobTracker. submitJob(job) 方法,此時作業(yè)已經(jīng)提交完成。 關(guān)鍵代碼流程解析: jobClient.submit(); 調(diào)用 jobClient.submitJobInternal(conf); 在這個函數(shù)中利用 jobId 建立提 交根路徑, jar 文件路徑, job 分割文件的路徑, job.xml 路徑。代碼如下:

8、JobID jobId = jobSubmitClient.getNewJobId();/ 生成 jobId rpc Path submitJobDir = new Path(getSystemDir(), jobId.toString();/ 用 jobId 來 建立 job 任務(wù)的提交根路徑 Path submitJobFile = new Path(submitJobDir, job.xml);/ 生成 job.xml ,這個 xml 將要記 錄 Configuration 中的所有配置信息,這個貌似與我們的 NEMR 的 xml 配置功能相似。 configureCommandLine

9、Options() ;/ 其中按 job 提交路徑調(diào)用 FileSystem 在其中建立虛擬 路徑并,并把要執(zhí)行的自定義程序打包成 jar 然后傳到 FileSystem 中已定義的 jar 文件路徑 中。 job.getOutputFormat().checkOutputSpecs(getFs(), job);/ 從這里去 FileSystem 中查看輸出 路徑是否已存在,如果已存在則報(bào)已存在異常。否則繼續(xù)執(zhí)行。 maps = writeNewSplits(context, submitSplitFile); / 這一步的主要目標(biāo)是將分割好的 inputsplit 數(shù)組信息寫到 Dfs 中,

10、然后把寫入的路徑添加到 Configuration 中 job.getJobConf();/ 獲取已定 input=ReflectionUtils.newInstance(job.getInputFormatClass(), 義的 inputformat 類,如果未定義默認(rèn)為 TextInputFormat 。 splits=input.getSplits(job);/ 獲取分配好的的 InputSplit 集合。與我們的 NEMR 一樣這里 同樣需要輸入文件的 wn 地址,以 TextInputFormat 舉例, TextInputFormat 需要繼承 FileInputFormat 類

11、, FileInputFormat 類中已實(shí)現(xiàn) getSplits() 方法,在 getSplits() 方法中需要用 到 FileSystem, 來獲取輸入文件的主機(jī)地址、長度、起點(diǎn)信息。 DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, array.length);/這一步 的目標(biāo)是講分割好的 InputSplit 信息寫入到 DFS 中,包括頭文件、版本號、數(shù)組長度 Serializer serializer = factory.getSerializer(Class) array0.getClass(

12、);/ 在這里主要 目標(biāo)是將已經(jīng)分割好的 InputSplit 信息寫到 Configuration 中。 FSDataOutputStream out = FileSystem.create(getFs(), submitJobFile, new FsPermission (JOB_FILE_PERMISSION); try job.writeXml(out); finally out.close(); / 從這里我們看到上面代碼中在 fs 中建立的 job.xml ,具體就是對 job.xml 把 Configuration 的配置信息寫在 job.xml 里。 JobStatus sta

13、tus = jobSubmitClient.submitJob(jobld); /這里通知 JobTracker,發(fā)送 jobld 過 去,由 JobTracker 根據(jù) jobId 去執(zhí)行。 JobClient 里面使用使用 RPC 機(jī)制來構(gòu)造一個實(shí)現(xiàn) JobSubmissionProtocol 接口的 JobTracker 的代理,然后利用遠(yuǎn)程發(fā)放直接執(zhí)行 JobTracker 里 的submitJob,與我們的利用Socket通信略有不同。 DN JobTracker JobTracker的地位相當(dāng)于我們的Master,它負(fù)責(zé)調(diào)度job的每一個子任務(wù) task運(yùn)行于 slave 上,并監(jiān)

14、控它們,如果發(fā)現(xiàn)有失敗的 task 就重新運(yùn)行它。 JobTracker 一直在等待 JobClient 通過 RPC 提交作業(yè),而 TaskTracker 一直通過 RPC 向 JobTracker 發(fā)送心跳 heartbeat 詢問有沒有任務(wù)可做, 如果有, 讓其派發(fā)任務(wù)給它執(zhí)行。 如果 JobTracker 的作業(yè)隊(duì)列不為空 , 則 TaskTracker 發(fā)送的心跳將會獲得 JobTracker 給它派發(fā)的任務(wù)。這是一道 pull 過程 : slave 主動向master拉生意。slave節(jié)點(diǎn)的TaskTracker接到任務(wù)后在其本地發(fā)起Task,然后執(zhí)行任 務(wù)。 啟動 : 有一個 m

15、ain 函數(shù),不過這里我們可以先從它著手開始分析。 tracker = new JobTracker(conf);/構(gòu)造 構(gòu)造函數(shù)先獲取一堆常量的值,然后清空systemDir,接著啟動RPC服務(wù)器。 lnetSocketAddress addr = getAddress(conf); this.localMachine = addr.getHostName(); this.port = addr.getPort(); erTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);

16、erTrackerServer.start(); 啟動 TracklnfoServer: Port = conf.getlnt(.port, 50030); Server = new JobTrackerlnfoServer(this, infoPort); Server.start(); JoblnProgress JobClient提交job后,JobTracker會創(chuàng)建一個 JoblnProgress來跟蹤和調(diào)度這個job,并 把它添加到j(luò)ob隊(duì)列里。JoblnProgress會根據(jù)提交的

17、job jar中定義的輸入數(shù)據(jù)集(已分解成 FileSplit )創(chuàng)建對應(yīng)的一批 TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時在創(chuàng)建指定數(shù)目 的 Task In Progress 用于監(jiān)控和調(diào)度 ReduceTask,缺省為 1 個 ReduceTask。 TaskInProgress 每個TasklnProgress就代表一個 map或一個reduce處理,它將Job里面的Map和Reduce 操作進(jìn)行了封裝,不過在Master端生成的Task In Progress只是初始化了信息但并不調(diào)用執(zhí)行 方法。它等待 TaskTracker端的RPC調(diào)用。 JobTracker.

18、heartbeat(); 這個方法主要是 TaskTracker 端遠(yuǎn)程調(diào)用時用到的方法, 其主要作用就是分派具體任務(wù), 并將該任務(wù)分發(fā)到 TaskTracker 端: 其關(guān)鍵代碼為: 1 HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); 2 List actions = new ArrayList(); 3 if (tasks = null) 4 tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName); 5 6 if (tas

19、ks != null) 7 for (Task task : tasks) 8 expireLaunchingTasks.addNewTask(task.getTaskID(); 9 actions.add(new LaunchTaskAction(task); 10 11 這里的調(diào)度中主要會根據(jù)我們的任務(wù)的輸入文件 dfs 中的文件存放節(jié)點(diǎn)來分配, 數(shù)據(jù)在 哪個節(jié)點(diǎn)上任務(wù)就分配到哪個節(jié)點(diǎn)上的 TaskTracker 中。 簡單來說這里會根據(jù)我們的上面提到的Job In Progress和Task In Porgress調(diào)度生成具體的 MapTask 和 ReuceTask 實(shí)例,他們均繼承自

20、抽象類 Task 該實(shí)例會放入到 LaunchTaskAction 中,最后獲得的任務(wù)列表會被返回到TaskTracker端。Task會在下面做簡單介紹。 代碼流程解析: 第一步:當(dāng) JobClient 向 JobTracker 通信時利用 RPC 調(diào)用 JobTracker 的 submitJob()方法 時,中會生成一個 JobInProgress 類的實(shí)例,該實(shí)例是為了記錄當(dāng)前這個 Job 任務(wù)的執(zhí)行狀 況。在構(gòu)造的同時會調(diào)用到FileSystem,把在JobClient端上傳的所有任務(wù)文件下載到本地 的文件系統(tǒng)中的臨時目錄里。見程序清單1,這其中包括上傳的 *.jar 文件包、記錄配置

21、信 息的 xml 、記錄分割信息的文件。 第二步:生成的 JobInProgress 實(shí)例會加載到作業(yè)隊(duì)列管理器 QueueManager 中,通過 QueueManager 來調(diào)度。然后在流程中執(zhí)行到 jobtracker.initJob(),執(zhí)行 jobinProgress.initTask(); 根據(jù)初始化的信息生成每一個 map與reduce,每個 map與reduce都是TaskInProgress, TaskI nProgress會根據(jù)不同參數(shù)分別創(chuàng)建具體的 Map Task或者ReduceTask。生成的map與 reduce 監(jiān)控類會被維護(hù)在 JobInProgress 實(shí)例中

22、。 第三步:首先步驟 JobInProgress 會創(chuàng)建 map 的監(jiān)控對象 , 在 initTasks() 函數(shù)里通過調(diào)用 JobClient 的 readSplitFile() 獲得已分解的輸入數(shù)據(jù)的 RawSplit 列表該列表已經(jīng)在初始化時下 載到本地了,然后根據(jù)這個列表創(chuàng)建對應(yīng)數(shù)目的Map執(zhí)行管理對象Task In Progress。在這個 過程中,還會記錄該 RawSplit 塊對應(yīng)的所有在 FILESYSTEM 里的 blocks 所在的 DataNode 節(jié)點(diǎn)的host,這個會在RawSplit創(chuàng)建時通過FileSplit的函數(shù)獲取,該函數(shù)會調(diào)用 FileSystem 的函數(shù)獲

23、得。其次 JobInProgress 會創(chuàng)建 Reduce 的監(jiān)控對象,這個比較簡單,根據(jù) JobConf 里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個Reduce任務(wù)。監(jiān)控和調(diào)度 Reduce任務(wù)的也是 TaskInProgress 類,不過構(gòu)造方法有所不同。 WN TaskTracker Task 的執(zhí)行實(shí)際是由 TaskTracker 發(fā)起的, TaskTracker 會定期(缺省為 3 秒鐘,參見 MRConstants 類中定義的 HEARTBEAT_INTERV AL 變量)與 JobTracker 進(jìn)行一次通信,報(bào) 告自己Task的執(zhí)行狀態(tài),接收 JobTracker的指令等,T

24、askTracker里面會通過循環(huán)的方式查 找。 TaskTracker$TaskLauncher TaskLauncher 是個繼承了 Thread 線程的 TaskTracker 的內(nèi)部類,在這里面會維護(hù)一個 TaskInProgress 的鏈表: private List tasksToLaunch; 該列表中的每個 TaskInProgress 實(shí)例對應(yīng)一個 TaskUnit 任務(wù)。 該類中的 run 方法才是主體關(guān)鍵之處,他會循環(huán)判斷是否 tasksToLaunch 中有新任務(wù)要 做,有就去從該列表中拿出來然后去調(diào)用TaskTracker.startNewTask(TaskInpro

25、gress);去開啟一 個新任務(wù)。 Task 在這里有兩個子類,分別是 MapTask 和 ReduceTask。 TaskTracker$TaskInProgress 這里的 TaskInProgress 主要是對每個執(zhí)行的任務(wù)的監(jiān)控和具體調(diào)度。 TaskTracker.localizeJob(); 此函數(shù)主要任務(wù)是初始化工作目錄 workDir ,再將 job jar 包從 HDFS 復(fù)制到本地文件系 統(tǒng)中,調(diào)用Run Jar.u nJar()將包解壓到工作目錄。然后創(chuàng)建一個 Runnin gJob并調(diào)用 addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊(duì)列中。完成后即

26、調(diào)用launchTaskForJob() 開始執(zhí)行 Task。 TaskTracker.launchTaskForJob() ; 主要是設(shè)置參數(shù),并調(diào)用TaskTracker$Taskl nProgress.lau nchTask()來啟動已創(chuàng)建的 Task 任務(wù)。 JvmManager 任務(wù)執(zhí)行的主體調(diào)度。并管理任務(wù)隊(duì)列,維護(hù)map與reduce的雙重任務(wù)隊(duì)列。這里面 會根據(jù)取出的任務(wù)去生成一個對應(yīng)的JvmRunner類,JvmRunner繼承自Thread,其run函數(shù) 主體會調(diào)用 runChild() 方法, runChild 會執(zhí)行主體函數(shù)。 MapTask.run(); initia

27、lize(job, getJoblD(), reporter, useNewApi); if (useNewApi) runNewMapper(job, split, umbilical, reporter); else runOldMapper(job, split, umbilical, reporter); 這里主要包括兩個關(guān)鍵塊:一個是 initialize 這里會根據(jù)預(yù)設(shè)的 OutputFormat 來格式輸 出,然后就是從 runNewMapper() 或 runOldMapper() 處執(zhí)行 map 任務(wù),用 runNewMapper 舉 例:從代碼清單2中我們看出這里的執(zhí)行過程

28、,首先我們的程序先生成我們?nèi)蝿?wù)的map類, 然后生成 map 任務(wù)的數(shù)據(jù)輸入格式類,并根據(jù)我們的數(shù)據(jù)輸入格式將我們的這塊的數(shù)據(jù)分 割成我們的指定的輸入數(shù)據(jù)就是RecordReader,然后將RecordReader作為輸入循環(huán)調(diào)用map 的最終map()方法,也就是我們的客戶端的主體 map方法。 ReduceTask.run(); 與 map 的開始過程一樣,不再重復(fù)了,就是在后面有所不同,首先reduce 的數(shù)據(jù)會在 操作前利用 Merge 函數(shù)合并一下,然后生成 key、value 對遍歷對象,然后執(zhí)行循環(huán)執(zhí)行 Reducer.reduce(),結(jié)果上傳到 fs 中。 代碼流程: 啟動:

29、 TaskTracker 的啟動的時候會加載所有信息,包括利用 RPC 獲得 JobTracker 的 RPC 變量定義為 jobClient , TaskTracker.run() 方法會去循環(huán)向 JobTracker 心跳,在里面主要 調(diào)用 TaskTracker. offerService() 方法, offerService 方法調(diào)用 JobTracker.transmitHeartBeat() 方 法,去執(zhí)行jobClient.heartbeat()(也就是上面我們介紹的JobTracker.heartbeat();)返回心跳信息 HeartbeatResponse 類,所有 map

30、 或 reduce 信息就在類 LaunchTaskAction( 每個 maptask 或 reducetask 對應(yīng)一個獨(dú)自的 LaunchTaskAction 實(shí)例, LaunchTaskAction 類實(shí)現(xiàn)了抽象類 LaunchTaskAction) 的實(shí)例里面。 TaskTrackerAction actions = heartbeatResponse.getActions(); if (actions != null) for (TaskTrackerAction taskaction: actions) if (taskaction instanceof LaunchTaskAc

31、tion) addToTaskQueue(LaunchTaskAction) taskaction); 根據(jù)獲得的taskaction,循環(huán)添加到 map與reduce維護(hù)列表中,如下: private TaskLauncher mapLauncher; private TaskLauncher reduceLauncher; 從 TaskLauncher 的主體中會執(zhí)行開啟新的任務(wù)。這個在上面已經(jīng)對 TaskLauncher 做了 詳細(xì)介紹。 在上面代碼中看到的 addToTaskQueue () 方法在調(diào)用的時候會調(diào)用到 TaskLauncher. addToTaskQueue ()該方法

32、體內(nèi)會調(diào)用 TaskTracker.registerTask()該方法會根據(jù)每個taskaction 生成一個 TaskInProgress 的實(shí)例, TaskInProgress 與 DN 中的 TaskInProgress 不同,這里的 TaskInProgress 是 TaskTracker 的內(nèi)部類, 它是每個任務(wù)運(yùn)行的主體, 生成的實(shí)例會被添加到 任務(wù)列表中去,就是我們在上面介紹過的tasksToLau nch。該類中的主體方法會循環(huán)查找任 務(wù)去執(zhí)行。如 TaskLauncher類介紹,調(diào)用到的 TaskTracker.startNewTask()方法開啟一個新的 任務(wù),然后會去執(zhí)行

33、關(guān)鍵方法TaskTracker. localizeJob(任務(wù))初始化所需要的信息,和工作目 錄的創(chuàng)建和調(diào)用 hdfs下載執(zhí)行的jar包和配置信息xml,完成向監(jiān)控隊(duì)列的添加后會去調(diào)用 launchTaskForJob(),然后 launchTaskForJob()調(diào)用 TaskTracker$TaskInProgress.launchTask()開 始執(zhí)行 Task。 1 TaskTracker$Task In Progress.lau nchTask()調(diào)用的分析: 2 localizeTask(task); 3 this.runner = task.createRunner(TaskTr

34、acker.this, this); 4 this.runner.start(); 通過 localizeTask 方法根據(jù)當(dāng)前的任務(wù)創(chuàng)建工作目錄,并把所需要的數(shù)據(jù)信息下載到本 地。如果是 Map任務(wù)的話就是 MapTask.createRunner();方法會去創(chuàng)建 MapTaskRunner,如果 是 Reduce 的話就是 ReduceTask.createRunner();方法去創(chuàng)建 ReduceTaskRunner。TaskRunner 是繼承 Thread 的類,在它的 run 函數(shù)主體執(zhí)行的過程比較復(fù)雜,主要的工作就是初始化啟 動java子進(jìn)程的一系列環(huán)境變量,包括設(shè)定工作目錄

35、workDir,設(shè)置CLASSPATH環(huán)境變量 等(需要將TaskTracker的環(huán)境變量以及job jar的路徑合并起來),然后裝載job jar包。代 碼往下走: jvmManageraunchJvm(); 這里是執(zhí)行的主體,函數(shù)體內(nèi)會判斷是map任務(wù)還是reduce任務(wù),并把它加載到管理 隊(duì)列里面。從JvmMa nager中我們看到run Child()方法的調(diào)用,方法中會去調(diào)用 DefaultTaskController 中的 launchTaskJVM()方法,DefaultTaskController.launchTaskJVM()會 根據(jù)已經(jīng)獲取的環(huán)境變量和jvm運(yùn)行堆大小設(shè)定等

36、參數(shù),利用ProcessBuilder自動生成shell 腳本,并運(yùn)行來設(shè)定wn節(jié)點(diǎn)的變量信息。 在裝載過的信息中有個類是Child它自動去調(diào)用TaskTracker.getTask()方法去取出當(dāng)前的 task任務(wù),然后調(diào)用run()方法在這里同樣會用到hdfs從中取出輸入數(shù)據(jù),然后根據(jù)判斷是 舊api還是新api去執(zhí)行對應(yīng)的方法。具體在MapTask.run();和ReduceTask.run();中解釋。 與job執(zhí)行相關(guān)的接口、類。與以前版本一樣,這里沒發(fā)生變化。 刑xmcy : TakTrackefStatva. irt iniiiaiCortiait: boat* in nncep

37、fNewTaiila;in 尺*.的,- slurt/ : lalcr I ntckcrProlucul i I :JobTruktr 1 hujnbtfir(ir, stilus: TaskliuiktfrSmtus, in血口 : book lii Tiisks hnuk ill twpunsckl ikin); riftirTtiCdiRrpMtMr rukl rvckerStalw TakTracktr I uskl nit ktrAcliuD Kill 1 akActinn KLllJubAirEiu runChiltk in Lirty : in dir : l ik. in Im

38、Udd : Ktriag) 2-3-2與執(zhí)fJob仃瓷向梵和力法 程序清單1. this . localFs = jobtracker.getLocalFileSystem();/hdfs JobConf default_job_conf = this . localJobFile new JobConf(default_conf); =default_job_conf.getLocalPath(JobTracker. + /+ jobid + .xml); this . localJarFile = default_job_conf.getLocalPath(JobTracker. .jar

39、 ); + /+ jobid + Path jobDir = jobtracker.getSystemDirectoryForJob( jobId ); fs = jobtracker.getFileSystem(jobDir); jobFile = new Path(jobDir, job.xml ); fs .copyToLocalFile( jobFile , localJobFile ); SUBDIR SUBDIR conf = new JobConf( localJobFile ); this . priority = conf .getJobPriority(); this .

40、status .setJobPriority(this . priority ); this . profile = new JobProfile( conf .getUser(), jobid, jobFile .toString(), url,conf .getJobName(), conf .getQueueName(); String jarFile =conf .getJar(); if (jarFile !=null ) fs .copyToLocalFile( new Path(jarFile), localJarFile ); conf .setJar( localJarFil

41、e .toString(); 代碼清單2 / make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext( new job, getTaskID(); / make a mapper org.apache.hadoop.mapreduce.MapperVNKEY, INVALUE, OUTKEY, OUTVALUEmapper = (org.apache.hadoop.mapreduce.MapperVNKEY, INVALUE, OUTKEY, OUTVALUE) R

42、eflectionUtils .newInstance (taskContext.getMapperClass(), job); / make the input format org.apache.hadoop.mapreduce.InputFormatVNKEY, INVALUE inputFormat = (org.apache.hadoop.mapreduce.InputFormatVNKEY, INVALUE) ReflectionUtils .newInstance (taskContext.getInputFormatClass(), job); / rebuild the in

43、put split org.apache.hadoop.mapreduce .In putSplit split =null ; DataInputBuffer splitBuffer =new DataInputBuffer(); splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength(); SerializationFactory factory =new SerializationFactory(job); Deserializer extends org.apache.hadoop.mapreduce .In putSp

44、lit deserializer = (Deserializerextends org.apache.hadoop.mapreduce .In putSplit) factory .getDeserializer(job.getClassByName(splitClass ); deserializer.open(splitBuffer); split = deserializer.deserialize(null ); org.apache.hadoop.mapreduce.RecordReaderVNKEY, INVALUE input =new NewTrackingRecordReaderVNKEY, INVALUE( inputFormat.createRecordReader(split, taskContext), reporter); job.setBoolean( mapred.skip.on , isSkipping(); or

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論