mapreducejava
㈠ 如何通過java程序提交yarn的maprece計算任務
由於項目需求,需要通過Java程序提交Yarn的MapRece的計算任務。與一般的通過Jar包提交MapRece任務不同,通過程序提交MapRece任務需要有點小變動,詳見以下代碼。
以下為MapRece主程序,有幾點需要提一下:
1、在程序中,我將文件讀入格式設定為WholeFileInputFormat,即不對文件進行切分。
2、為了控制rece的處理過程,map的輸出鍵的格式為組合鍵格式。與常規的<key,value>不同,這里變為了<TextPair,Value>,TextPair的格式為<key1,key2>。
3、為了適應組合鍵,重新設定了分組函數,即GroupComparator。分組規則為,只要TextPair中的key1相同(不要求key2相同),則數據被分配到一個rece容器中。這樣,當相同key1的數據進入rece容器後,key2起到了一個數據標識的作用。
㈡ hadoop 2.2.0 maprece java.lang.NullPointerException
能再具體點嗎,感覺問題出在創建臨時目錄時,JobSubmissionFiles.getStagingDir,FilterFileSystem.mkdirs,RawLocalFileSystem.setPermission,可能是許可權問題,話說在如果可以的話最好不要在Eclipse下跑Maprece,打包到集群最好
㈢ 如何快速地編寫和運行一個屬於自己的MapRece例子程序
大數據的時代, 到處張嘴閉嘴都是Hadoop, MapRece, 不跟上時代怎麼行? 可是對一個hadoop的新手, 寫一個屬於自己的MapRece程序還是小有點難度的, 需要建立一個maven項目, 還要搞清楚各種庫的依賴, 再加上編譯運行, 基本上頭大兩圈了吧。 這也使得很多隻是想簡單了解一下MapRece的人望而卻步。
本文會教你如何用最快最簡單的方法編寫和運行一個屬於自己的MapRece程序, let's go!
首先有兩個前提:
1. 有一個已經可以運行的hadoop 集群(也可以是偽分布系統), 上面的hdfs和maprece工作正常 (這個真的是最基本的了, 不再累述, 不會的請參考 http://hadoop.apache.org/docs/current/)
2. 集群上安裝了JDK (編譯運行時會用到)
正式開始
1. 首先登入hadoop 集群裡面的一個節點, 創建一個java源文件, 偷懶起見, 基本盜用官方的word count (因為本文的目的是教會你如何快編寫和運行一個MapRece程序, 而不是如何寫好一個功能齊全的MapRece程序)
內容如下:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class myword {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in> <out>');
System.exit(2);
}
Job job = new Job(conf, 'word count');
job.setJarByClass(myword.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
與官方版本相比, 主要做了兩處修改
1) 為了簡單起見,去掉了開頭的 package org.apache.hadoop.examples;
2) 將類名從 WordCount 改為 myword, 以體現是我們自己的工作成果 :)
2. 拿到hadoop 運行的class path, 主要為編譯所用
運行命令
hadoop classpath
保存打出的結果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-maprece/lib/*:/usr/lib/gphd/hadoop-maprece/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:
3. 編譯
運行命令
javac -classpath xxx ./myword.java
xxx部分就是上一步裡面取到的class path
運行完此命令後, 當前目錄下會生成一些.class 文件, 例如:
myword.class myword$IntSumRecer.class myword$TokenizerMapper.class
4. 將class文件打包成.jar文件
運行命令
jar -cvf myword.jar ./*.class
至此, 目標jar 文件成功生成
5. 准備一些文本文件, 上傳到hdfs, 以做word count的input
例子:
隨意創建一些文本文件, 保存到mapred_test 文件夾
運行命令
hadoop fs -put ./mapred_test/
確保此文件夾成功上傳到hdfs 當前用戶根目錄下
6. 運行我們的程序
運行命令
hadoop jar ./myword.jar myword mapred_test output
順利的話, 此命令會正常進行, 一個MapRece job 會開始工作, 輸出的結果會保存在 hdfs 當前用戶根目錄下的output 文件夾裡面。
至此大功告成!
如果還需要更多的功能, 我們可以修改前面的源文件以達到一個真正有用的MapRece job。
但是原理大同小異, 練手的話, 基本夠了。
一個拋磚引玉的簡單例子, 歡迎板磚。
轉載
㈣ maprece什麼語言編程
maprece是hadoop的一個分布式計算框架,使用的是java語言編寫
㈤ 用Java寫MapRece,用python和R,哪種更適合從事數據行業,做數據...
必然python啊,不過R也很好。python更加靈活,但是R是這一方面的功能一點不弱。但是我感覺很多演算法拿python實現會更容易,而且python更好學,語法更簡潔。具體看個人。
㈥ 如何將java類對象作為maprece中map函數的輸入
1.首先介紹一下wordcount 早maprece框架中的 對應關系
大家都知道 maprece 分為 map 和rece 兩個部分,那麼在wordcount例子中,很顯然 對文件word 計數部分為map,對 word 數量累計部分為 rece;
大家都明白 map接受一個參數,經過map處理後,將處理結果作為rece的入參分發給rece,然後在rece中統計了word 的數量,最終輸出到輸出結果;
但是初看遇到的問題:
一、map的輸入參數是個 Text之類的 對象,並不是 file對象
二、rece中並沒有if-else之類的判斷語句 ,來說明 這個word 數量 加 一次,那個word 加一次。那麼這個判斷到底只是在 map中已經區分了 還是在rece的時候才判斷的
三、map過程到底做了什麼,rece過程到底做了什麼?為什麼它能夠做到多個map多個rece?
一、
1. 怎麼將 文件參數 傳遞 到 job中呢?
在 client 我們調用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
實際上 addInputPath 做了以下的事情(將文件路徑載入到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
我們再來看看 FileInputFormat 是做什麼用的, FileInputFormat 實現了 InputFormat 介面 ,這個介面是hadoop用來接收客戶端輸入參數的。所有的輸入格式都繼承於InputFormat,這是一個抽象類,其子類有專門用於讀取普通文件的FileInputFormat,用來讀取資料庫的DBInputFormat等等。
我們會看到 在 InputFormat 介面中 有getSplits方法,也就是說分片操作實際上實在 map之前 就已經做好了
List<InputSplit>getSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具體實現參考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它們會被用來計算分片大小。可以通過設置mapred.min.split.size和mapred.max.split.size來設置。splits鏈表用來存儲計算得到的輸入分片,files則存儲作為由listStatus()獲取的輸入文件列表。然後對於每個輸入文件,判斷是否可以分割,通過computeSplitSize計算出分片大小splitSize,計算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize。然後我們根據這個splitSize計算出每個文件的inputSplits集合,然後加入分片列表splits中。注意到我們生成InputSplit的時候按上面說的使用文件路徑,分片起始位置,分片大小和存放這個文件的hosts列表來創建。最後我們還設置了輸入文件數量:maprece.input.num.files。
二、計算出來的分片有時怎麼傳遞給 map呢 ?對於單詞數量如何累加?
我們使用了 就是InputFormat中的另一個方法createRecordReader() 這個方法:
RecordReader:
RecordReader是用來從一個輸入分片中讀取一個一個的K -V 對的抽象類,我們可以將其看作是在InputSplit上的迭代器。我們從API介面中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它獲取分片上的下一個K-V 對。
可以看到介面中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
FileInputFormat<K,V>
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat
對於 wordcount 測試用了 NLineInputFormat和 TextInputFormat 實現類
在 InputFormat 構建一個 RecordReader 出來,然後調用RecordReader initialize 的方法,初始化RecordReader 對象
那麼 到底 Map是怎麼調用 的呢? 通過前邊我們 已經將 文件分片了,並且將文件分片的內容存放到了RecordReader中,
下面繼續看看這些RecordReader是如何被MapRece框架使用的
終於 說道 Map了 ,我么如果要實現Map 那麼 一定要繼承 Mapper這個類
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }
protected void cleanup(Context context ) throws IOException, InterruptedException { }
public void run(Context context) throws IOException, InterruptedException { }
我們寫MapRece程序的時候,我們寫的mapper都要繼承這個Mapper.class,通常我們會重寫map()方法,map()每次接受一個K-V對,然後我們對這個K-V對進行處理,再分發出處理後的數據。我們也可能重寫setup()以對這個map task進行一些預處理,比如創建一個List之類的;我們也可能重寫cleanup()方法對做一些處理後的工作,當然我們也可能在cleanup()中寫出K-V對。舉個例子就是:InputSplit的數據是一些整數,然後我們要在mapper中算出它們的和。我們就可以在先設置個sum屬性,然後map()函數處理一個K-V對就是將其加到sum上,最後在cleanup()函數中調用context.write(key,value);
最後我們看看Mapper.class中的run()方法,它相當於map task的驅動,我們可以看到run()方法首先調用setup()進行初始操作,然後對每個context.nextKeyValue()獲取的K-V對,就調用map()函數進行處理,最後調用cleanup()做最後的處理。事實上,從context.nextKeyValue()就是使用了相應的RecordReader來獲取K-V對的。
我們看看Mapper.class中的Context類,它繼承與MapContext,使用了一個RecordReader進行構造。下面我們再看這個MapContext。
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
RecordReader 看來是在這里構造出來了, 那麼 是誰調用這個方法,將這個承載著關鍵數據信息的 RecordReader 傳過來了 ?
我們可以想像 這里 應該被框架調用的可能性比較大了,那麼maprece 框架是怎麼分別來調用map和rece呢?
還以為分析完map就完事了,才發現這里僅僅是做了maprece 框架調用前的一些准備工作,
還是繼續分析 下 maprece 框架調用吧:
1.在 job提交 任務之後 首先由jobtrack 分發任務,
在 任務分發完成之後 ,執行 task的時候,這時 調用了 maptask 中的 runNewMapper
在這個方法中調用了 MapContextImpl, 至此 這個map 和框架就可以聯系起來了。