当前位置:首页 » 编程语言 » mapreducejava

mapreducejava

发布时间: 2022-12-27 02:56:50

㈠ 如何通过java程序提交yarn的maprece计算任务

由于项目需求,需要通过Java程序提交Yarn的MapRece的计算任务。与一般的通过Jar包提交MapRece任务不同,通过程序提交MapRece任务需要有点小变动,详见以下代码。
以下为MapRece主程序,有几点需要提一下:
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。
2、为了控制rece的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。
3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个rece容器中。这样,当相同key1的数据进入rece容器后,key2起到了一个数据标识的作用。

㈡ hadoop 2.2.0 maprece java.lang.NullPointerException

能再具体点吗,感觉问题出在创建临时目录时,JobSubmissionFiles.getStagingDir,FilterFileSystem.mkdirs,RawLocalFileSystem.setPermission,可能是权限问题,话说在如果可以的话最好不要在Eclipse下跑Maprece,打包到集群最好

㈢ 如何快速地编写和运行一个属于自己的MapRece例子程序

大数据的时代, 到处张嘴闭嘴都是Hadoop, MapRece, 不跟上时代怎么行? 可是对一个hadoop的新手, 写一个属于自己的MapRece程序还是小有点难度的, 需要建立一个maven项目, 还要搞清楚各种库的依赖, 再加上编译运行, 基本上头大两圈了吧。 这也使得很多只是想简单了解一下MapRece的人望而却步。
本文会教你如何用最快最简单的方法编写和运行一个属于自己的MapRece程序, let's go!
首先有两个前提:
1. 有一个已经可以运行的hadoop 集群(也可以是伪分布系统), 上面的hdfs和maprece工作正常 (这个真的是最基本的了, 不再累述, 不会的请参考 http://hadoop.apache.org/docs/current/)
2. 集群上安装了JDK (编译运行时会用到)
正式开始
1. 首先登入hadoop 集群里面的一个节点, 创建一个java源文件, 偷懒起见, 基本盗用官方的word count (因为本文的目的是教会你如何快编写和运行一个MapRece程序, 而不是如何写好一个功能齐全的MapRece程序)
内容如下:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class myword {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in> <out>');
System.exit(2);
}
Job job = new Job(conf, 'word count');
job.setJarByClass(myword.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

与官方版本相比, 主要做了两处修改
1) 为了简单起见,去掉了开头的 package org.apache.hadoop.examples;
2) 将类名从 WordCount 改为 myword, 以体现是我们自己的工作成果 :)
2. 拿到hadoop 运行的class path, 主要为编译所用
运行命令
hadoop classpath

保存打出的结果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-maprece/lib/*:/usr/lib/gphd/hadoop-maprece/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:

3. 编译
运行命令
javac -classpath xxx ./myword.java

xxx部分就是上一步里面取到的class path
运行完此命令后, 当前目录下会生成一些.class 文件, 例如:
myword.class myword$IntSumRecer.class myword$TokenizerMapper.class
4. 将class文件打包成.jar文件
运行命令
jar -cvf myword.jar ./*.class

至此, 目标jar 文件成功生成
5. 准备一些文本文件, 上传到hdfs, 以做word count的input
例子:
随意创建一些文本文件, 保存到mapred_test 文件夹
运行命令
hadoop fs -put ./mapred_test/

确保此文件夹成功上传到hdfs 当前用户根目录下
6. 运行我们的程序
运行命令
hadoop jar ./myword.jar myword mapred_test output

顺利的话, 此命令会正常进行, 一个MapRece job 会开始工作, 输出的结果会保存在 hdfs 当前用户根目录下的output 文件夹里面。
至此大功告成!
如果还需要更多的功能, 我们可以修改前面的源文件以达到一个真正有用的MapRece job。
但是原理大同小异, 练手的话, 基本够了。
一个抛砖引玉的简单例子, 欢迎板砖。
转载

㈣ maprece什么语言编程

maprece是hadoop的一个分布式计算框架,使用的是java语言编写

㈤ 用Java写MapRece,用python和R,哪种更适合从事数据行业,做数据...

必然python啊,不过R也很好。python更加灵活,但是R是这一方面的功能一点不弱。但是我感觉很多算法拿python实现会更容易,而且python更好学,语法更简洁。具体看个人。

㈥ 如何将java类对象作为maprece中map函数的输入

1.首先介绍一下wordcount 早maprece框架中的 对应关系
大家都知道 maprece 分为 map 和rece 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 rece;
大家都明白 map接受一个参数,经过map处理后,将处理结果作为rece的入参分发给rece,然后在rece中统计了word 的数量,最终输出到输出结果;
但是初看遇到的问题:
一、map的输入参数是个 Text之类的 对象,并不是 file对象
二、rece中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在rece的时候才判断的
三、map过程到底做了什么,rece过程到底做了什么?为什么它能够做到多个map多个rece?

一、
1. 怎么将 文件参数 传递 到 job中呢?
在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}

我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
List<InputSplit>getSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具体实现参考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:maprece.input.num.files。

二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:
RecordReader:
RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

可以看到接口中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;

FileInputFormat<K,V>
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat

对于 wordcount 测试用了 NLineInputFormat和 TextInputFormat 实现类

在 InputFormat 构建一个 RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象

那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,

下面继续看看这些RecordReader是如何被MapRece框架使用的

终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }
protected void cleanup(Context context ) throws IOException, InterruptedException { }
public void run(Context context) throws IOException, InterruptedException { }

我们写MapRece程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);
最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。

我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。

public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}

RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?

我们可以想象 这里 应该被框架调用的可能性比较大了,那么maprece 框架是怎么分别来调用map和rece呢?
还以为分析完map就完事了,才发现这里仅仅是做了maprece 框架调用前的一些准备工作,

还是继续分析 下 maprece 框架调用吧:

1.在 job提交 任务之后 首先由jobtrack 分发任务,

在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper

在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了。

热点内容
怎样删除小视频文件夹 发布:2024-05-19 05:49:29 浏览:589
开启php短标签 发布:2024-05-19 05:44:12 浏览:473
android各国语言 发布:2024-05-19 05:42:54 浏览:247
微信什么资料都没怎么找回密码 发布:2024-05-19 05:35:34 浏览:907
填志愿密码是什么 发布:2024-05-19 05:30:23 浏览:318
城堡争霸自动掠夺脚本 发布:2024-05-19 05:22:06 浏览:204
asp编程工具 发布:2024-05-19 05:20:36 浏览:143
insertpython 发布:2024-05-19 05:12:26 浏览:244
androidant编译 发布:2024-05-19 05:04:11 浏览:988
按键脚本优化 发布:2024-05-19 04:59:57 浏览:752